Skip to content

Commit

Permalink
Added completion support for invocations
Browse files Browse the repository at this point in the history
  • Loading branch information
jhalterman committed Jul 5, 2015
1 parent 392e4e1 commit 54f1a72
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 36 deletions.
15 changes: 9 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ RetryPolicy delayPolicy = new RetryPolicy()
.withMaxRetries(100);

RetryPolicy backoffPolicy = new RetryPolicy()
.withBackoff(1, 30, TimeUnit.SECONDS)
.withMaxDuration(5, TimeUnit.MINUTES);
.withBackoff(1, 30, TimeUnit.SECONDS)
.withMaxDuration(5, TimeUnit.MINUTES);
```

#### Synchronous Retries
Expand All @@ -52,15 +52,18 @@ Asynchronous code reports failures via future callbacks rather than throwing an

```java
Recurrent.get(invocation -> {
someService.connect(host, port).onFailure((failure) -> {
// Manually retry invocation
if (!invocation.retry(failure))
someService.connect(host, port).whenComplete((result, failure) -> {
if (failure == null)
invocation.complete(result);
else if (!invocation.retry(failure))
log.error("Connection attempts failed", failure)
}
}, retryPolicy, executor));
```

Java 8 users can also use Recurrent to retry [CompletableFuture] calls:
#### CompletableFuture Integration

Java 8 users can use Recurrent to retry [CompletableFuture] calls:

```java
Recurrent.future(() -> CompletableFuture.supplyAsync(() -> "foo")
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
<dependency>
<groupId>net.jodah</groupId>
<artifactId>concurrentunit</artifactId>
<version>0.3.5-SNAPSHOT</version>
<version>0.3.5</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/net/jodah/recurrent/AsyncCallable.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ public Void call() throws Exception {
};
}

static <T> AsyncCallable<T> ofFuture(
final ContextualCallable<? extends java.util.concurrent.CompletableFuture<T>> callable) {
static <T> AsyncCallable<T> ofFuture(final ContextualCallable<java.util.concurrent.CompletableFuture<T>> callable) {
return new AsyncCallable<T>() {
@Override
public synchronized T call() throws Exception {
Expand Down
36 changes: 33 additions & 3 deletions src/main/java/net/jodah/recurrent/Invocation.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.concurrent.TimeUnit;

import net.jodah.recurrent.internal.util.Assert;
import net.jodah.recurrent.util.Duration;

/**
Expand All @@ -12,6 +13,7 @@
*/
public class Invocation {
private RetryPolicy retryPolicy;
private RecurrentFuture<?> future;
private long startTime;

/** Count of retry attempts */
Expand All @@ -21,12 +23,21 @@ public class Invocation {
/** Indicates whether a retry has been requested */
volatile boolean retryRequested;

Invocation(RetryPolicy retryPolicy) {
Invocation(RetryPolicy retryPolicy, RecurrentFuture<?> future) {
this.retryPolicy = retryPolicy;
this.future = future;
waitTime = retryPolicy.getDelay().toNanos();
startTime = System.nanoTime();
}

/**
* Completes the invocation.
*/
@SuppressWarnings("unchecked")
public void complete(Object result) {
((RecurrentFuture<Object>) future).complete(result, null);
}

/**
* Gets the number of retries that have been attempted so far.
*/
Expand All @@ -37,19 +48,38 @@ public int getRetryCount() {
/**
* Retries a failed invocation, returning true if the invocation's retry policy has not been exceeded, else false.
*/
public boolean retry() {
return retryInternal(null);
}

/**
* Retries a failed invocation, returning true if the invocation's retry policy has not been exceeded, else false.
*
* @throws NullPointerException if {@code failure} is null
*/
public boolean retry(Throwable failure) {
Assert.notNull(failure, "failure");
return retryInternal(failure);
}

@SuppressWarnings("unchecked")
private boolean retryInternal(Throwable failure) {
if (retryRequested)
return true;

// TODO validate failure against policy
// TODO validate failure against policy if failure != null
recordFailedAttempt();
if (!isPolicyExceeded()) {
retryRequested = true;
return true;
}

if (failure == null)
failure = new RuntimeException("Retry invocations exceeded");
((RecurrentFuture<Object>) future).complete(null, failure);
return false;
}

/**
* Returns the wait time.
*/
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/net/jodah/recurrent/Recurrent.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ private Recurrent() {
* Invokes the {@code callable}, scheduling retries with the {@code executor} according to the {@code retryPolicy}.
*/
public static <T> java.util.concurrent.CompletableFuture<T> future(
Callable<? extends java.util.concurrent.CompletableFuture<T>> callable, RetryPolicy retryPolicy,
Callable<java.util.concurrent.CompletableFuture<T>> callable, RetryPolicy retryPolicy,
ScheduledExecutorService executor) {
return future(contextual(callable), retryPolicy, executor);
}
Expand All @@ -28,7 +28,7 @@ public static <T> java.util.concurrent.CompletableFuture<T> future(
* Invokes the {@code callable}, scheduling retries with the {@code executor} according to the {@code retryPolicy}.
*/
public static <T> java.util.concurrent.CompletableFuture<T> future(
ContextualCallable<? extends java.util.concurrent.CompletableFuture<T>> callable, RetryPolicy retryPolicy,
ContextualCallable<java.util.concurrent.CompletableFuture<T>> callable, RetryPolicy retryPolicy,
ScheduledExecutorService executor) {
final java.util.concurrent.CompletableFuture<T> response = new java.util.concurrent.CompletableFuture<T>();
RecurrentFuture<T> future = new RecurrentFuture<T>(executor).whenComplete(new CompletionListener<T>() {
Expand Down Expand Up @@ -103,7 +103,7 @@ private static <T> RecurrentFuture<T> call(final AsyncCallable<T> callable, fina
final ScheduledExecutorService executor, RecurrentFuture<T> future) {
if (future == null)
future = new RecurrentFuture<T>(executor);
final Invocation invocation = new Invocation(retryPolicy);
final Invocation invocation = new Invocation(retryPolicy, future);
callable.initialize(invocation, future, executor);
future.setFuture(executor.submit(callable));
return future;
Expand All @@ -116,7 +116,7 @@ private static <T> RecurrentFuture<T> call(final AsyncCallable<T> callable, fina
* wrapped in RuntimeException.
*/
private static <T> T call(Callable<T> callable, RetryPolicy retryPolicy) {
Invocation invocation = new Invocation(retryPolicy);
Invocation invocation = new Invocation(retryPolicy, null);

while (true) {
try {
Expand Down
11 changes: 11 additions & 0 deletions src/test/java/net/jodah/recurrent/Asserts.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@
import net.jodah.recurrent.Testing.ThrowableRunnable;

public class Asserts {
@SafeVarargs
public static boolean matches(Throwable actual, Class<? extends Throwable>... throwableHierarchy) {
Throwable current = actual;
for (Class<? extends Throwable> expected : throwableHierarchy) {
if (!expected.isInstance(current))
return false;
current = current.getCause();
}
return true;
}

@SafeVarargs
public static void assertMatches(Throwable actual, Class<? extends Throwable>... throwableHierarchy) {
Throwable current = actual;
Expand Down
43 changes: 25 additions & 18 deletions src/test/java/net/jodah/recurrent/RecurrentTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package net.jodah.recurrent;

import static net.jodah.recurrent.Asserts.assertThrows;
import static net.jodah.recurrent.Asserts.matches;
import static net.jodah.recurrent.Testing.ignoreExceptions;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
Expand All @@ -14,6 +15,7 @@
import java.net.SocketException;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -32,8 +34,8 @@ public class RecurrentTest {
private Waiter waiter;

// Results from a synchronous Recurrent call
@SuppressWarnings("unchecked") Class<? extends Throwable>[] syncThrowables = new Class[] {
RuntimeException.class, SocketException.class };
@SuppressWarnings("unchecked") Class<? extends Throwable>[] syncThrowables = new Class[] { RuntimeException.class,
SocketException.class };
// Results from a get against a future that wraps a synchronous Recurrent call
@SuppressWarnings("unchecked") Class<? extends Throwable>[] futureSyncThrowables = new Class[] {
ExecutionException.class, RuntimeException.class, SocketException.class };
Expand All @@ -54,8 +56,7 @@ protected void beforeMethod() {
}

@SuppressWarnings("unchecked")
private <T> Class<? extends Exception>[] failures(int numFailures,
Class<? extends Exception> failureType) {
private <T> Class<? extends Exception>[] failures(int numFailures, Class<? extends Exception> failureType) {
Class<? extends Exception>[] failures = new Class[numFailures];
for (int i = 0; i < numFailures; i++)
failures[i] = failureType;
Expand Down Expand Up @@ -94,16 +95,15 @@ private void assertRunWithExecutor(Object runnable) throws Throwable {
waiter.resume();
});
assertNull(future.get());
waiter.await();
waiter.await(3000);
verify(service, times(3)).connect();

// Given - Fail three times
reset(service);
when(service.connect()).thenThrow(failures(10, SocketException.class)).thenReturn(true);

// When
RecurrentFuture<?> future2 = runnable instanceof Runnable
? Recurrent.run((Runnable) runnable, retryTwice, executor)
RecurrentFuture<?> future2 = runnable instanceof Runnable ? Recurrent.run((Runnable) runnable, retryTwice, executor)
: Recurrent.run((ContextualRunnable) runnable, retryTwice, executor);

// Then
Expand All @@ -114,7 +114,7 @@ private void assertRunWithExecutor(Object runnable) throws Throwable {
waiter.resume();
});
assertThrows(() -> future2.get(), futureAsyncThrowables);
waiter.await();
waiter.await(3000);
verify(service, times(3)).connect();
}

Expand Down Expand Up @@ -169,7 +169,7 @@ private void assertGetWithExecutor(Object callable) throws Throwable {
waiter.resume();
});
assertTrue(future.get());
waiter.await();
waiter.await(3000);
verify(service, times(3)).connect();

// Given - Fail three times
Expand All @@ -189,7 +189,7 @@ private void assertGetWithExecutor(Object callable) throws Throwable {
waiter.resume();
});
assertThrows(() -> future2.get(), futureAsyncThrowables);
waiter.await();
waiter.await(3000);
verify(service, times(3)).connect();
}

Expand Down Expand Up @@ -230,7 +230,7 @@ private void assertGetFuture(Object callable) throws Throwable {
waiter.resume();
});
assertTrue(future.get());
waiter.await();
waiter.await(3000);
verify(service, times(3)).connect();

// Given - Fail three times
Expand All @@ -246,11 +246,11 @@ private void assertGetFuture(Object callable) throws Throwable {
waiter.expectResume();
future2.whenComplete((result, failure) -> {
waiter.assertNull(result);
waiter.assertTrue(failure instanceof SocketException);
waiter.assertTrue(matches(failure, CompletionException.class, SocketException.class));
waiter.resume();
});
assertThrows(() -> future2.get(), futureAsyncThrowables);
waiter.await();
waiter.await(3000);
verify(service, times(3)).connect();
}

Expand Down Expand Up @@ -286,17 +286,24 @@ public void testPerStageRetries() throws Throwable {
// Fail three times
reset(service);
when(service.connect()).thenThrow(failures(10, SocketException.class)).thenReturn(true);
assertThrows(
() -> CompletableFuture
.supplyAsync(() -> Recurrent.get(() -> service.connect(), retryTwice)).get(),
assertThrows(() -> CompletableFuture.supplyAsync(() -> Recurrent.get(() -> service.connect(), retryTwice)).get(),
futureSyncThrowables);
verify(service, times(3)).connect();
}

public void shouldCancelFuture() throws Throwable {
RecurrentFuture<?> future = Recurrent.run(() -> ignoreExceptions(() -> Thread.sleep(10000)),
retryAlways, executor);
RecurrentFuture<?> future = Recurrent.run(() -> ignoreExceptions(() -> Thread.sleep(10000)), retryAlways, executor);
future.cancel(true);
assertTrue(future.isCancelled());
}

public void shouldManuallyRetryAndComplete() throws Throwable {
Recurrent.get((ctx) -> {
if (ctx.getRetryCount() < 2)
ctx.retry();
else
ctx.complete(true);
return null;
} , retryAlways, executor);
}
}
4 changes: 2 additions & 2 deletions src/test/java/net/jodah/recurrent/examples/NettyExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public void example() throws Throwable {
Bootstrap bootstrap = createBootstrap(group);
RetryPolicy retryPolicy = new RetryPolicy();

Recurrent.get((invocation) -> bootstrap.connect(HOST, PORT).addListener((ChannelFutureListener) channelFuture -> {
Recurrent.get(invocation -> bootstrap.connect(HOST, PORT).addListener((ChannelFutureListener) channelFuture -> {
if (channelFuture.isSuccess())
System.out.println("Connected");
invocation.complete(channelFuture);
else if (!invocation.retry(channelFuture.cause()))
System.out.println("Connection attempts failed");
}), retryPolicy, group).whenComplete((channelFuture, failure) -> {
Expand Down

0 comments on commit 54f1a72

Please sign in to comment.