From 9df9c42eae3a677c9fae3182c86cd9137ff79a0e Mon Sep 17 00:00:00 2001 From: Jonathan Halterman Date: Wed, 30 Mar 2016 21:03:57 -0700 Subject: [PATCH] Added support for CheckedRunnable. Documented Listeners. Documented RecurrentFuture callbacks. --- CHANGES.md | 19 ++++-- README.md | 62 +++++++++---------- .../net/jodah/recurrent/AsyncCallable.java | 18 ++++++ .../net/jodah/recurrent/AsyncListeners.java | 33 +++++++++- .../java/net/jodah/recurrent/Callables.java | 15 +++++ .../net/jodah/recurrent/CheckedRunnable.java | 10 +++ .../java/net/jodah/recurrent/Listeners.java | 60 ++++++++++++++++++ .../java/net/jodah/recurrent/Recurrent.java | 55 ++++++++-------- .../jodah/recurrent/RecurrentException.java | 13 ++++ .../net/jodah/recurrent/RecurrentFuture.java | 58 +++++++++++++++++ .../java/net/jodah/recurrent/Asserts.java | 2 +- .../jodah/recurrent/AsyncListenersTest.java | 4 +- .../net/jodah/recurrent/ListenersTest.java | 16 +++++ .../net/jodah/recurrent/RecurrentTest.java | 14 ++--- 14 files changed, 304 insertions(+), 75 deletions(-) create mode 100644 src/main/java/net/jodah/recurrent/CheckedRunnable.java create mode 100644 src/main/java/net/jodah/recurrent/RecurrentException.java diff --git a/CHANGES.md b/CHANGES.md index 1cd5e6aa..5d466d40 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,14 @@ +# 0.5.0 + +### New Features + +* Added support for synchronous and asynchronous event listeners +* Added support for `CheckedRunnable` + +### API Changes + +* The `Recurrent.run` methods now require a `CheckedRunnable` rather than `Runnable`. This allows Recurrent to be used on code that throws checked exceptions without having to wrap the code in try/catch blocks. + # 0.4.0 ### New Features @@ -6,18 +17,18 @@ ### API Changes -* New Invocation and AsyncInvocation APIs +* New Invocation and `AsyncInvocation` APIs # 0.3.3 ### New Features -* Add Scheduler API -* Make RetryPolicy copyable +* Add `Scheduler` API +* Make `RetryPolicy` copyable ### Behavior Changes -* Require ContextualCallable and ContextualRunnable to be manually retried +* Require `ContextualCallable` and `ContextualRunnable` to be manually retried * Add support for checking multiple retry policy conditions ### API Changes diff --git a/README.md b/README.md index 8e4ed9b1..f55e98bb 100644 --- a/README.md +++ b/README.md @@ -12,8 +12,8 @@ Recurrent is a simple, zero-dependency library for performing retries. It featur * [Synchronous](synchronous-retries) and [asynchronous retries](#asynchronous-retries) * [Asynchronous API integration](#asynchronous-api-integration) * [CompletableFuture](#completablefuture-integration) and [Java 8 functional interface](#java-8-functional-interfaces) integration -* [Invocation Tracking](#invocation-tracking) * [Event Listeners](#event-listeners) +* [Invocation Tracking](#invocation-tracking) Supports Java 6+ though the documentation uses lambdas for simplicity. @@ -135,34 +135,6 @@ CompletableFuture.supplyAsync(() -> Recurrent.get(() -> "foo", retryPolicy)) .thenApplyAsync(value -> Recurrent.get(() -> value + "bar", retryPolicy)); ``` -#### Invocation Tracking - -In addition to automatically performing retries, Recurrent can be used to track invocations for you, allowing you to manually retry as needed: - -```java -Invocation invocation = new Invocation(retryPolicy); -while (!invocation.isComplete()) { - try { - doSomething(); - invocation.complete() - } catch (ConnectException e) { - invocation.recordFailure(e); - } -} -``` - -Invocation tracking is also useful for integrating with APIs that have their own retry mechanism: - -```java -Invocation invocation = new Invocation(retryPolicy); - -// On failure -if (invocation.canRetryOn(someFailure)) - service.scheduleRetry(invocation.getWaitMillis(), TimeUnit.MILLISECONDS); -``` - -See the [RxJava example][RxJava] for a more detailed implementation. - #### Event Listeners Recurrent supports event listeners that can be notified when retries are performed and when invocations complete: @@ -185,14 +157,42 @@ Recurrent.get(() -> connect(), retryPolicy, new Listeners() { Java 8 users can register individual listeners using lambdas: ```java -Recurrent.get(() -> connect(), retryPolicy, new Listeners() +Recurrent.get(() -> connect(), retryPolicy, new Listeners() .whenRetry((c, f, stats) -> log.warn("Failure #{}. Retrying.", stats.getAttemptCount())) - .whenFailure((cxn, failure) -> log.error("Connection attempts failed", failure))) + .whenFailure((cxn, failure) -> log.error("Connection attempts failed", failure)) .whenSuccess(cxn -> log.info("Connected to {}", cxn))); ``` Additional listeners are available via the [Listeners] and [AsyncListeners] classes. Asynchronous completion listeners can be registered via [RecurrentFuture]. +#### Invocation Tracking + +In addition to automatically performing retries, Recurrent can be used to track invocations for you, allowing you to manually retry as needed: + +```java +Invocation invocation = new Invocation(retryPolicy); +while (!invocation.isComplete()) { + try { + doSomething(); + invocation.complete() + } catch (ConnectException e) { + invocation.recordFailure(e); + } +} +``` + +Invocation tracking is also useful for integrating with APIs that have their own retry mechanism: + +```java +Invocation invocation = new Invocation(retryPolicy); + +// On failure +if (invocation.canRetryOn(someFailure)) + service.scheduleRetry(invocation.getWaitMillis(), TimeUnit.MILLISECONDS); +``` + +See the [RxJava example][RxJava] for a more detailed implementation. + ## Example Integrations Recurrent was designed to integrate nicely with existing libraries. Here are some example integrations: diff --git a/src/main/java/net/jodah/recurrent/AsyncCallable.java b/src/main/java/net/jodah/recurrent/AsyncCallable.java index fb774ef8..394b8df7 100644 --- a/src/main/java/net/jodah/recurrent/AsyncCallable.java +++ b/src/main/java/net/jodah/recurrent/AsyncCallable.java @@ -82,6 +82,24 @@ public T call() throws Exception { } }; } + + static AsyncCallable of(final CheckedRunnable runnable) { + Assert.notNull(runnable, "runnable"); + return new AsyncCallable() { + @Override + public T call() throws Exception { + try { + invocation.reset(); + runnable.run(); + invocation.completeOrRetry(null, null); + } catch (Exception e) { + invocation.completeOrRetry(null, e); + } + + return null; + } + }; + } static AsyncCallable ofFuture(final Callable> callable) { Assert.notNull(callable, "callable"); diff --git a/src/main/java/net/jodah/recurrent/AsyncListeners.java b/src/main/java/net/jodah/recurrent/AsyncListeners.java index ba5aae01..7141890c 100644 --- a/src/main/java/net/jodah/recurrent/AsyncListeners.java +++ b/src/main/java/net/jodah/recurrent/AsyncListeners.java @@ -2,6 +2,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import net.jodah.recurrent.event.ContextualResultListener; @@ -9,8 +10,8 @@ import net.jodah.recurrent.internal.util.Assert; /** - * Recurrent event listeners that are called asynchronously. To handle completion events asynchronously, see - * {@link RecurrentFuture}. + * Recurrent event listeners that are called asynchronously on the {@link Scheduler} or {@link ScheduledExecutorService} + * associated with the Recurrent call. To handle completion events asynchronously, see {@link RecurrentFuture}. * * @author Jonathan Halterman * @param result type @@ -81,44 +82,72 @@ private static void call(Callable callable, ExecutorService executor, Schedul scheduler.schedule(callable, 0, TimeUnit.MILLISECONDS); } + /** + * Registers the {@code listener} to be called asynchronously after a failed invocation attempt. + */ public AsyncListeners whenFailedAttemptAsync(ContextualResultListener listener) { asyncCtxFailedAttemptListener = new AsyncCtxResultListener(listener); return this; } + /** + * Registers the {@code listener} to be called asynchronously after a failed invocation attempt. + */ public AsyncListeners whenFailedAttemptAsync(ContextualResultListener listener, ExecutorService executor) { asyncCtxFailedAttemptListener = new AsyncCtxResultListener(listener, executor); return this; } + /** + * Registers the {@code listener} to be called asynchronously after a failed invocation attempt. + */ public AsyncListeners whenFailedAttemptAsync(ResultListener listener) { asyncFailedAttemptListener = new AsyncResultListener(listener); return this; } + /** + * Registers the {@code listener} to be called asynchronously after a failed invocation attempt. + */ public AsyncListeners whenFailedAttemptAsync(ResultListener listener, ExecutorService executor) { asyncFailedAttemptListener = new AsyncResultListener(listener, executor); return this; } + /** + * Registers the {@code listener} to be called asynchronously when the retry policy is exceeded and the result is a + * failure. + */ public AsyncListeners whenRetryAsync(ContextualResultListener listener) { asyncCtxRetryListener = new AsyncCtxResultListener(listener); return this; } + /** + * Registers the {@code listener} to be called asynchronously when the retry policy is exceeded and the result is a + * failure. + */ public AsyncListeners whenRetryAsync(ContextualResultListener listener, ExecutorService executor) { asyncCtxRetryListener = new AsyncCtxResultListener(listener, executor); return this; } + /** + * Registers the {@code listener} to be called asynchronously when the retry policy is exceeded and the result is a + * failure. + */ public AsyncListeners whenRetryAsync(ResultListener listener) { asyncRetryListener = new AsyncResultListener(listener); return this; } + /** + * Registers the {@code listener} to be called asynchronously when the retry policy is exceeded and the result is a + * failure. + */ public AsyncListeners whenRetryAsync(ResultListener listener, ExecutorService executor) { asyncRetryListener = new AsyncResultListener(listener, executor); diff --git a/src/main/java/net/jodah/recurrent/Callables.java b/src/main/java/net/jodah/recurrent/Callables.java index 96774e80..8e589064 100644 --- a/src/main/java/net/jodah/recurrent/Callables.java +++ b/src/main/java/net/jodah/recurrent/Callables.java @@ -43,4 +43,19 @@ public Void call() { } }; } + + static Callable of(final CheckedRunnable runnable) { + Assert.notNull(runnable, "runnable"); + return new Callable() { + @Override + public Void call() { + try { + runnable.run(); + return null; + } catch (Exception e) { + throw new RecurrentException(e); + } + } + }; + } } diff --git a/src/main/java/net/jodah/recurrent/CheckedRunnable.java b/src/main/java/net/jodah/recurrent/CheckedRunnable.java new file mode 100644 index 00000000..784427b7 --- /dev/null +++ b/src/main/java/net/jodah/recurrent/CheckedRunnable.java @@ -0,0 +1,10 @@ +package net.jodah.recurrent; + +/** + * A Runnable that throws checked exceptions. + * + * @author Jonathan Halterman + */ +public interface CheckedRunnable { + void run() throws Exception; +} diff --git a/src/main/java/net/jodah/recurrent/Listeners.java b/src/main/java/net/jodah/recurrent/Listeners.java index 0a39b69a..c5b82580 100644 --- a/src/main/java/net/jodah/recurrent/Listeners.java +++ b/src/main/java/net/jodah/recurrent/Listeners.java @@ -44,110 +44,170 @@ public void onResult(T result, Throwable failure) { }; } + /** + * Called when an invocation is completed. + */ public void onComplete(T result, Throwable failure) { if (completeListener != null) completeListener.onResult(result, failure); } + /** + * Called when an invocation is completed. + */ public void onComplete(T result, Throwable failure, InvocationStats stats) { if (ctxCompleteListener != null) ctxCompleteListener.onResult(result, failure, stats); } + /** + * Called after a failed attempt. + */ public void onFailedAttempt(T result, Throwable failure) { if (failedAttemptListener != null) failedAttemptListener.onResult(result, failure); } + /** + * Called after a failed attempt. + */ public void onFailedAttempt(T result, Throwable failure, InvocationStats stats) { if (ctxFailedAttemptListener != null) ctxFailedAttemptListener.onResult(result, failure, stats); } + /** + * Called after the retry policy is exceeded and the result is a failure. + */ public void onFailure(T result, Throwable failure) { if (failureListener != null) failureListener.onResult(result, failure); } + /** + * Called after the retry policy is exceeded and the result is a failure. + */ public void onFailure(T result, Throwable failure, InvocationStats stats) { if (ctxFailureListener != null) ctxFailureListener.onResult(result, failure, stats); } + /** + * Called before a retry is attempted. + */ public void onRetry(T result, Throwable failure) { if (retryListener != null) retryListener.onResult(result, failure); } + /** + * Called before a retry is attempted. + */ public void onRetry(T result, Throwable failure, InvocationStats stats) { if (ctxRetryListener != null) ctxRetryListener.onResult(result, failure, stats); } + /** + * Called after a successful invocation. + */ public void onSuccess(T result) { if (successListener != null) successListener.onSuccess(result); } + /** + * Called after a successful invocation. + */ public void onSuccess(T result, InvocationStats stats) { if (ctxSuccessListener != null) ctxSuccessListener.onSuccess(result, stats); } + /** + * Registers the {@code listener} to be called when an invocation is completed. + */ @SuppressWarnings("unchecked") public Listeners whenComplete(ContextualResultListener listener) { ctxCompleteListener = (ContextualResultListener) Assert.notNull(listener, "listener"); return this; } + /** + * Registers the {@code listener} to be called when an invocation is completed. + */ @SuppressWarnings("unchecked") public Listeners whenComplete(ResultListener listener) { completeListener = (ResultListener) Assert.notNull(listener, "listener"); return this; } + /** + * Registers the {@code listener} to be called after a failed invocation attempt. + */ @SuppressWarnings("unchecked") public Listeners whenFailedAttempt(ContextualResultListener listener) { ctxFailedAttemptListener = (ContextualResultListener) Assert.notNull(listener, "listener"); return this; } + /** + * Registers the {@code listener} to be called after a failed invocation attempt. + */ @SuppressWarnings("unchecked") public Listeners whenFailedAttempt(ResultListener listener) { failedAttemptListener = (ResultListener) Assert.notNull(listener, "listener"); return this; } + /** + * Registers the {@code listener} to be called after an invocation attempt fails. + */ @SuppressWarnings("unchecked") public Listeners whenFailure(ContextualResultListener listener) { ctxFailureListener = (ContextualResultListener) Assert.notNull(listener, "listener"); return this; } + /** + * Registers the {@code listener} to be called when the retry policy is exceeded and the result is a failure. + */ @SuppressWarnings("unchecked") public Listeners whenFailure(ResultListener listener) { failureListener = (ResultListener) Assert.notNull(listener, "listener"); return this; } + /** + * Registers the {@code listener} to be called when the retry policy is exceeded and the result is a failure. + */ @SuppressWarnings("unchecked") public Listeners whenRetry(ContextualResultListener listener) { ctxRetryListener = (ContextualResultListener) Assert.notNull(listener, "listener"); return this; } + /** + * Registers the {@code listener} to be called before a retry is attempted. + */ @SuppressWarnings("unchecked") public Listeners whenRetry(ResultListener listener) { retryListener = (ResultListener) Assert.notNull(listener, "listener"); return this; } + /** + * Registers the {@code listener} to be called after a successful invocation. + */ @SuppressWarnings("unchecked") public Listeners whenSuccess(ContextualSuccessListener listener) { ctxSuccessListener = (ContextualSuccessListener) Assert.notNull(listener, "listener"); return this; } + /** + * Registers the {@code listener} to be called after a successful invocation. + */ @SuppressWarnings("unchecked") public Listeners whenSuccess(SuccessListener listener) { successListener = (SuccessListener) Assert.notNull(listener, "listener"); diff --git a/src/main/java/net/jodah/recurrent/Recurrent.java b/src/main/java/net/jodah/recurrent/Recurrent.java index 5f20df79..752fd593 100644 --- a/src/main/java/net/jodah/recurrent/Recurrent.java +++ b/src/main/java/net/jodah/recurrent/Recurrent.java @@ -14,8 +14,8 @@ * * @author Jonathan Halterman */ -public final class Recurrent { - private Recurrent() { +public class Recurrent { + Recurrent() { } /** @@ -157,9 +157,8 @@ public static java.util.concurrent.CompletableFuture future( * Invokes the {@code callable}, sleeping between invocation attempts according to the {@code retryPolicy}. * * @throws NullPointerException if any argument is null - * @throws RuntimeException if the {@code callable} fails with a Throwable and the retry policy is exceeded or if - * interrupted while waiting to perform a retry. Checked exceptions, including InterruptedException, are - * wrapped in RuntimeException. + * @throws RecurrentException if the {@code callable} fails with a Throwable and the retry policy is exceeded or if + * interrupted while waiting to perform a retry. */ public static T get(Callable callable, RetryPolicy retryPolicy) { return call(callable, retryPolicy, null); @@ -170,9 +169,8 @@ public static T get(Callable callable, RetryPolicy retryPolicy) { * calling the {@code listeners} on recurrent events. * * @throws NullPointerException if any argument is null - * @throws RuntimeException if the {@code callable} fails with a Throwable and the retry policy is exceeded or if - * interrupted while waiting to perform a retry. Checked exceptions, including InterruptedException, are - * wrapped in RuntimeException. + * @throws RecurrentException if the {@code callable} fails with a Throwable and the retry policy is exceeded or if + * interrupted while waiting to perform a retry. */ public static T get(Callable callable, RetryPolicy retryPolicy, Listeners listeners) { return call(callable, retryPolicy, Assert.notNull(listeners, "listeners")); @@ -367,11 +365,10 @@ public static RecurrentFuture run(ContextualRunnable runnable, RetryPolic * Invokes the {@code runnable}, sleeping between invocation attempts according to the {@code retryPolicy}. * * @throws NullPointerException if any argument is null - * @throws RuntimeException if the {@code runnable} fails with a Throwable and the retry policy is exceeded or if - * interrupted while waiting to perform a retry. Checked exceptions, including InterruptedException, are - * wrapped in RuntimeException. + * @throws RecurrentException if the {@code callable} fails with a Throwable and the retry policy is exceeded or if + * interrupted while waiting to perform a retry. */ - public static void run(Runnable runnable, RetryPolicy retryPolicy) { + public static void run(CheckedRunnable runnable, RetryPolicy retryPolicy) { call(Callables.of(runnable), retryPolicy, null); } @@ -380,12 +377,11 @@ public static void run(Runnable runnable, RetryPolicy retryPolicy) { * calling the {@code listeners} on recurrent events. * * @throws NullPointerException if any argument is null - * @throws RuntimeException if the {@code runnable} fails with a Throwable and the retry policy is exceeded or if - * interrupted while waiting to perform a retry. Checked exceptions, including InterruptedException, are - * wrapped in RuntimeException. + * @throws RecurrentException if the {@code callable} fails with a Throwable and the retry policy is exceeded or if + * interrupted while waiting to perform a retry. */ @SuppressWarnings("unchecked") - public static void run(Runnable runnable, RetryPolicy retryPolicy, Listeners listeners) { + public static void run(CheckedRunnable runnable, RetryPolicy retryPolicy, Listeners listeners) { call(Callables.of(runnable), retryPolicy, (Listeners) Assert.notNull(listeners, "listeners")); } @@ -394,7 +390,8 @@ public static void run(Runnable runnable, RetryPolicy retryPolicy, Listeners * * @throws NullPointerException if any argument is null */ - public static RecurrentFuture run(Runnable runnable, RetryPolicy retryPolicy, ScheduledExecutorService executor) { + public static RecurrentFuture run(CheckedRunnable runnable, RetryPolicy retryPolicy, + ScheduledExecutorService executor) { return call(AsyncCallable.of(runnable), retryPolicy, Schedulers.of(executor), null, null); } @@ -403,7 +400,7 @@ public static RecurrentFuture run(Runnable runnable, RetryPolicy retryPolicy, * * @throws NullPointerException if any argument is null */ - public static RecurrentFuture run(Runnable runnable, RetryPolicy retryPolicy, + public static RecurrentFuture run(CheckedRunnable runnable, RetryPolicy retryPolicy, ScheduledExecutorService executor, AsyncListeners listeners) { return call(AsyncCallable.of(runnable), retryPolicy, Schedulers.of(executor), null, Assert.notNull(listeners, "listeners")); @@ -414,7 +411,7 @@ public static RecurrentFuture run(Runnable runnable, RetryPolicy retryPol * * @throws NullPointerException if any argument is null */ - public static RecurrentFuture run(Runnable runnable, RetryPolicy retryPolicy, Scheduler scheduler) { + public static RecurrentFuture run(CheckedRunnable runnable, RetryPolicy retryPolicy, Scheduler scheduler) { return call(AsyncCallable.of(runnable), retryPolicy, scheduler, null, null); } @@ -422,8 +419,10 @@ public static RecurrentFuture run(Runnable runnable, RetryPolicy retryPolicy, * Invokes the {@code runnable}, scheduling retries with the {@code scheduler} according to the {@code retryPolicy}. * * @throws NullPointerException if any argument is null + * @throws RecurrentException if the {@code callable} fails with a Throwable and the retry policy is exceeded or if + * interrupted while waiting to perform a retry. */ - public static RecurrentFuture run(Runnable runnable, RetryPolicy retryPolicy, Scheduler scheduler, + public static RecurrentFuture run(CheckedRunnable runnable, RetryPolicy retryPolicy, Scheduler scheduler, AsyncListeners listeners) { return call(AsyncCallable.of(runnable), retryPolicy, scheduler, null, Assert.notNull(listeners, "listeners")); } @@ -452,9 +451,8 @@ private static RecurrentFuture call(final AsyncCallable callable, fina /** * Calls the {@code callable} synchronously, performing retries according to the {@code retryPolicy}. * - * @throws RuntimeException if the {@code callable} fails with a Throwable and the retry policy is exceeded or if - * interrupted while waiting to perform a retry. Checked exceptions, including InterruptedException, are - * wrapped in RuntimeException. + * @throws RecurrentException if the {@code callable} fails with a Throwable and the retry policy is exceeded or if + * interrupted while waiting to perform a retry. */ private static T call(Callable callable, RetryPolicy retryPolicy, Listeners listeners) { Assert.notNull(callable, "callable"); @@ -482,13 +480,14 @@ private static T call(Callable callable, RetryPolicy retryPolicy, Listene // Handle retry needed if (shouldRetry) { - if (listeners != null) - listeners.handleRetry(result, failure, invocation); try { Thread.sleep(TimeUnit.NANOSECONDS.toMillis(invocation.waitTime)); } catch (InterruptedException e) { - throw new RuntimeException(e); + throw new RecurrentException(e); } + + if (listeners != null) + listeners.handleRetry(result, failure, invocation); } // Handle completion @@ -497,8 +496,8 @@ private static T call(Callable callable, RetryPolicy retryPolicy, Listene listeners.complete(result, failure, invocation, success); if (success || failure == null) return result; - RuntimeException re = failure instanceof RuntimeException ? (RuntimeException) failure - : new RuntimeException(failure); + RecurrentException re = failure instanceof RecurrentException ? (RecurrentException) failure + : new RecurrentException(failure); throw re; } } diff --git a/src/main/java/net/jodah/recurrent/RecurrentException.java b/src/main/java/net/jodah/recurrent/RecurrentException.java new file mode 100644 index 00000000..bd27dd9b --- /dev/null +++ b/src/main/java/net/jodah/recurrent/RecurrentException.java @@ -0,0 +1,13 @@ +package net.jodah.recurrent; + +/** + * Thrown when a synchronous Recurrent run() call fails with an exception. Use {@link Throwable#getCause()} to learn the + * cause of the failure. + * + * @author Jonathan Halterman + */ +public class RecurrentException extends RuntimeException { + RecurrentException(Throwable t) { + super(t); + } +} diff --git a/src/main/java/net/jodah/recurrent/RecurrentFuture.java b/src/main/java/net/jodah/recurrent/RecurrentFuture.java index 5e98b24a..6902c4bd 100644 --- a/src/main/java/net/jodah/recurrent/RecurrentFuture.java +++ b/src/main/java/net/jodah/recurrent/RecurrentFuture.java @@ -97,6 +97,9 @@ public boolean isDone() { return done; } + /** + * Registers the {@code listener} to be called when an invocation is completed. + */ public RecurrentFuture whenComplete(ContextualResultListener listener) { listeners.whenComplete(listener); if (done) @@ -104,6 +107,9 @@ public RecurrentFuture whenComplete(ContextualResultListener whenComplete(ResultListener listener) { listeners.whenComplete(listener); if (done) @@ -111,28 +117,43 @@ public RecurrentFuture whenComplete(ResultListener whenCompleteAsync(ContextualResultListener listener) { call(done, asyncCtxCompleteListener = new AsyncCtxResultListener(listener)); return this; } + /** + * Registers the {@code listener} to be called asynchronously when an invocation is completed. + */ public RecurrentFuture whenCompleteAsync(ContextualResultListener listener, ExecutorService executor) { call(done, asyncCtxCompleteListener = new AsyncCtxResultListener(listener, executor)); return this; } + /** + * Registers the {@code listener} to be called asynchronously when an invocation is completed. + */ public RecurrentFuture whenCompleteAsync(ResultListener listener) { call(done, asyncCompleteListener = new AsyncResultListener(listener)); return this; } + /** + * Registers the {@code listener} to be called asynchronously when an invocation is completed. + */ public RecurrentFuture whenCompleteAsync(ResultListener listener, ExecutorService executor) { call(done, asyncCompleteListener = new AsyncResultListener(listener, executor)); return this; } + /** + * Registers the {@code listener} to be called when the retry policy is exceeded and the result is a failure. + */ public RecurrentFuture whenFailure(ContextualResultListener listener) { listeners.whenFailure(listener); if (done && !success) @@ -140,6 +161,9 @@ public RecurrentFuture whenFailure(ContextualResultListener whenFailure(ResultListener listener) { listeners.whenFailure(listener); if (done && !success) @@ -147,28 +171,47 @@ public RecurrentFuture whenFailure(ResultListener whenFailureAsync(ContextualResultListener listener) { call(done && !success, asyncCtxFailureListener = new AsyncCtxResultListener(listener)); return this; } + /** + * Registers the {@code listener} to be called asynchronously when the retry policy is exceeded and the result is a + * failure. + */ public RecurrentFuture whenFailureAsync(ContextualResultListener listener, ExecutorService executor) { call(done && !success, asyncCtxFailureListener = new AsyncCtxResultListener(listener, executor)); return this; } + /** + * Registers the {@code listener} to be called asynchronously when the retry policy is exceeded and the result is a + * failure. + */ public RecurrentFuture whenFailureAsync(ResultListener listener) { call(done && !success, asyncFailureListener = new AsyncResultListener(listener)); return this; } + /** + * Registers the {@code listener} to be called asynchronously when the retry policy is exceeded and the result is a + * failure. + */ public RecurrentFuture whenFailureAsync(ResultListener listener, ExecutorService executor) { call(done && !success, asyncFailureListener = new AsyncResultListener(listener, executor)); return this; } + /** + * Registers the {@code listener} to be called after a successful invocation. + */ public RecurrentFuture whenSuccess(ContextualSuccessListener listener) { listeners.whenSuccess(listener); if (done && success) @@ -176,6 +219,9 @@ public RecurrentFuture whenSuccess(ContextualSuccessListener liste return this; } + /** + * Registers the {@code listener} to be called after a successful invocation. + */ public RecurrentFuture whenSuccess(SuccessListener listener) { listeners.whenSuccess(listener); if (done && success) @@ -183,23 +229,35 @@ public RecurrentFuture whenSuccess(SuccessListener listener) { return this; } + /** + * Registers the {@code listener} to be called asynchronously after a successful invocation. + */ public RecurrentFuture whenSuccessAsync(ContextualSuccessListener listener) { call(done && success, asyncCtxSuccessListener = new AsyncCtxResultListener(Listeners.resultListenerOf(listener))); return this; } + /** + * Registers the {@code listener} to be called asynchronously after a successful invocation. + */ public RecurrentFuture whenSuccessAsync(ContextualSuccessListener listener, ExecutorService executor) { call(done && success, asyncCtxSuccessListener = new AsyncCtxResultListener(Listeners.resultListenerOf(listener), executor)); return this; } + /** + * Registers the {@code listener} to be called asynchronously after a successful invocation. + */ public RecurrentFuture whenSuccessAsync(SuccessListener listener) { call(done && success, asyncSuccessListener = new AsyncResultListener(Listeners.resultListenerOf(listener))); return this; } + /** + * Registers the {@code listener} to be called asynchronously after a successful invocation. + */ public RecurrentFuture whenSuccessAsync(SuccessListener listener, ExecutorService executor) { call(done && success, asyncSuccessListener = new AsyncResultListener(Listeners.resultListenerOf(listener), executor)); diff --git a/src/test/java/net/jodah/recurrent/Asserts.java b/src/test/java/net/jodah/recurrent/Asserts.java index c3dc2cef..d6e85eec 100644 --- a/src/test/java/net/jodah/recurrent/Asserts.java +++ b/src/test/java/net/jodah/recurrent/Asserts.java @@ -25,7 +25,7 @@ public static boolean matches(Throwable actual, Class... th public static void assertMatches(Throwable actual, Class... throwableHierarchy) { Throwable current = actual; for (Class expected : throwableHierarchy) { - if (!expected.isInstance(current)) + if (!expected.equals(current.getClass())) Assert.fail( String.format("Bad exception type. Expected %s but was %s", Arrays.toString(throwableHierarchy), actual)); current = current.getCause(); diff --git a/src/test/java/net/jodah/recurrent/AsyncListenersTest.java b/src/test/java/net/jodah/recurrent/AsyncListenersTest.java index 61a5aace..e7d931c8 100644 --- a/src/test/java/net/jodah/recurrent/AsyncListenersTest.java +++ b/src/test/java/net/jodah/recurrent/AsyncListenersTest.java @@ -35,11 +35,11 @@ public interface Service { @BeforeClass void beforeClass() { - listeners.whenFailedAttemptAsync((r, f) -> { + listeners.whenFailedAttempt((r, f) -> { failedAttemptAsync.incrementAndGet(); waiter.resume(); }); - listeners.whenFailedAttemptAsync((r, f, s) -> { + listeners.whenFailedAttempt((r, f, s) -> { waiter.assertEquals(failedAttemptStatsAsync.incrementAndGet(), s.getAttemptCount()); waiter.resume(); }); diff --git a/src/test/java/net/jodah/recurrent/ListenersTest.java b/src/test/java/net/jodah/recurrent/ListenersTest.java index 2c83d3f4..7202838c 100644 --- a/src/test/java/net/jodah/recurrent/ListenersTest.java +++ b/src/test/java/net/jodah/recurrent/ListenersTest.java @@ -137,4 +137,20 @@ public void testListenersForFailureCompletion() { assertEquals(success.get(), 0); assertEquals(successStats.get(), 0); } + + public String connect() { + return "asdf"; + } + + public void test() { + Callable callable = () -> service.connect(); + + // Given - Fail twice then succeed + when(service.connect()).thenThrow(failures(2, SocketException.class)).thenReturn(false, false, true); + + Recurrent.get(callable, new RetryPolicy().retryFor(false).withMaxRetries(2), new Listeners() + .whenRetry((c, f, stats) -> System.out.println("Failure #{}. Retrying."+ stats.getAttemptCount())) + .whenFailure((cxn, failure) -> System.out.println("Connection attempts failed"+ failure)) + .whenSuccess(cxn -> System.out.println("Connected to {}" + cxn))); + } } diff --git a/src/test/java/net/jodah/recurrent/RecurrentTest.java b/src/test/java/net/jodah/recurrent/RecurrentTest.java index a9bd00ec..57f294f8 100644 --- a/src/test/java/net/jodah/recurrent/RecurrentTest.java +++ b/src/test/java/net/jodah/recurrent/RecurrentTest.java @@ -38,11 +38,11 @@ public class RecurrentTest { private Waiter waiter; // Results from a synchronous Recurrent call - @SuppressWarnings("unchecked") Class[] syncThrowables = new Class[] { RuntimeException.class, + @SuppressWarnings("unchecked") Class[] syncThrowables = new Class[] { RecurrentException.class, SocketException.class }; // Results from a get against a future that wraps a synchronous Recurrent call @SuppressWarnings("unchecked") Class[] futureSyncThrowables = new Class[] { - ExecutionException.class, RuntimeException.class, SocketException.class }; + ExecutionException.class, RecurrentException.class, SocketException.class }; // Results from a get against a future that wraps an asynchronous Recurrent call @SuppressWarnings("unchecked") Class[] futureAsyncThrowables = new Class[] { ExecutionException.class, SocketException.class }; @@ -60,7 +60,7 @@ protected void beforeMethod() { } public void shouldRun() throws Throwable { - Runnable runnable = () -> service.connect(); + CheckedRunnable runnable = () -> service.connect(); // Given - Fail twice then succeed when(service.connect()).thenThrow(failures(2, SocketException.class)).thenReturn(true); @@ -85,7 +85,7 @@ private void assertRunWithExecutor(Object runnable) throws Throwable { when(service.connect()).thenThrow(failures(2, SocketException.class)).thenReturn(true); // When - RecurrentFuture future = runnable instanceof Runnable ? Recurrent.run((Runnable) runnable, retryAlways, executor) + RecurrentFuture future = runnable instanceof CheckedRunnable ? Recurrent.run((CheckedRunnable) runnable, retryAlways, executor) : Recurrent.run((ContextualRunnable) runnable, retryAlways, executor); // Then @@ -103,7 +103,7 @@ private void assertRunWithExecutor(Object runnable) throws Throwable { when(service.connect()).thenThrow(failures(10, SocketException.class)); // When - RecurrentFuture future2 = runnable instanceof Runnable ? Recurrent.run((Runnable) runnable, retryTwice, executor) + RecurrentFuture future2 = runnable instanceof CheckedRunnable ? Recurrent.run((CheckedRunnable) runnable, retryTwice, executor) : Recurrent.run((ContextualRunnable) runnable, retryTwice, executor); // Then @@ -118,7 +118,7 @@ private void assertRunWithExecutor(Object runnable) throws Throwable { } public void shouldRunWithExecutor() throws Throwable { - assertRunWithExecutor((Runnable) () -> service.connect()); + assertRunWithExecutor((CheckedRunnable) () -> service.connect()); } public void shouldRunContextualWithExecutor() throws Throwable { @@ -337,7 +337,7 @@ public void shouldThrowOnNonRetriableFailure() throws Throwable { RetryPolicy retryPolicy = new RetryPolicy().retryOn(ConnectException.class); // When / Then - assertThrows(() -> Recurrent.get(() -> service.connect(), retryPolicy), IllegalStateException.class); + assertThrows(() -> Recurrent.get(() -> service.connect(), retryPolicy), RecurrentException.class, IllegalStateException.class); verify(service, times(3)).connect(); }