Skip to content

Commit

Permalink
Fallback and listener improvements
Browse files Browse the repository at this point in the history
* Added additional fallback method overloads
* Allowed the various listener/fallback function types to throw checked exceptions
* Moved all Failsafe config methods to common classes
  • Loading branch information
jhalterman committed Jul 27, 2016
1 parent 98b9bdf commit 1be3ab5
Show file tree
Hide file tree
Showing 24 changed files with 523 additions and 565 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ Or compute an alternative result such as from a backup resource:

```java
Failsafe.with(retryPolicy)
.withFallback(f -> this::connectToBackup)
.withFallback(this::connectToBackup)
.get(this::connectToPrimary);
```

Expand Down
30 changes: 14 additions & 16 deletions src/main/java/net/jodah/failsafe/AbstractExecution.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import net.jodah.failsafe.util.Duration;

abstract class AbstractExecution extends ExecutionContext {
final FailsafeConfig<Object, ?> config;
final RetryPolicy retryPolicy;
final CircuitBreaker circuitBreaker;
final ListenerConfig<?, Object> listeners;

// Mutable state
long attemptStartTime;
Expand All @@ -25,11 +25,11 @@ abstract class AbstractExecution extends ExecutionContext {
*
* @throws NullPointerException if {@code retryPolicy} is null
*/
AbstractExecution(RetryPolicy retryPolicy, CircuitBreaker circuitBreaker, ListenerConfig<?, Object> listeners) {
AbstractExecution(FailsafeConfig<Object, ?> config) {
super(new Duration(System.nanoTime(), TimeUnit.NANOSECONDS));
this.retryPolicy = retryPolicy;
this.circuitBreaker = circuitBreaker;
this.listeners = listeners;
this.config = config;
retryPolicy = config.retryPolicy;
this.circuitBreaker = config.circuitBreaker;
waitNanos = delayNanos = retryPolicy.getDelay().toNanos();
}

Expand Down Expand Up @@ -122,17 +122,15 @@ else if (retryPolicy.getJitterFactor() > 0.0)
success = completed && !isAbortable && !isRetryable && failure == null;

// Call listeners
if (listeners != null) {
if (!success)
listeners.handleFailedAttempt(result, failure, this);
if (isAbortable)
listeners.handleAbort(result, failure, this);
else {
if (retriesExceeded)
listeners.handleRetriesExceeded(result, failure, this);
if (completed)
listeners.handleComplete(result, failure, this, success);
}
if (!success)
config.handleFailedAttempt(result, failure, this);
if (isAbortable)
config.handleAbort(result, failure, this);
else {
if (retriesExceeded)
config.handleRetriesExceeded(result, failure, this);
if (completed)
config.handleComplete(result, failure, this, success);
}

return completed;
Expand Down
29 changes: 13 additions & 16 deletions src/main/java/net/jodah/failsafe/AsyncExecution.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import net.jodah.failsafe.function.BiFunction;
import net.jodah.failsafe.internal.util.Assert;
import net.jodah.failsafe.util.concurrent.Scheduler;

Expand All @@ -16,19 +15,17 @@
public final class AsyncExecution extends AbstractExecution {
private final Callable<Object> callable;
private final FailsafeFuture<Object> future;
private final BiFunction<Object, Throwable, Object> fallback;
private final Scheduler scheduler;
volatile boolean completeCalled;
volatile boolean retryCalled;

@SuppressWarnings("unchecked")
<T> AsyncExecution(Callable<T> callable, RetryPolicy retryPolicy, CircuitBreaker circuitBreaker, Scheduler scheduler,
FailsafeFuture<T> future, ListenerConfig<?, Object> listeners, BiFunction<T, Throwable, T> fallback) {
super(retryPolicy, circuitBreaker, listeners);
<T> AsyncExecution(Callable<T> callable, Scheduler scheduler, FailsafeFuture<T> future,
FailsafeConfig<Object, ?> config) {
super(config);
this.callable = (Callable<Object>) callable;
this.scheduler = scheduler;
this.future = (FailsafeFuture<Object>) future;
this.fallback = (BiFunction<Object, Throwable, Object>) fallback;
}

/**
Expand Down Expand Up @@ -115,17 +112,17 @@ public boolean retryOn(Throwable failure) {
* flags, and calling the retry listeners.
*/
void before() {
if (circuitBreaker != null && !circuitBreaker.allowsExecution()) {
if (config.circuitBreaker != null && !config.circuitBreaker.allowsExecution()) {
completed = true;
Exception failure = new CircuitBreakerOpenException();
if (listeners != null)
listeners.handleComplete(null, failure, this, false);
future.complete(null, failure, fallback);
if (config != null)
config.handleComplete(null, failure, this, false);
future.complete(null, failure, config.fallback);
return;
}

if (completeCalled && listeners != null)
listeners.handleRetry(lastResult, lastFailure, this);
if (completeCalled && config != null)
config.handleRetry(lastResult, lastFailure, this);

super.before();
completeCalled = false;
Expand All @@ -141,7 +138,7 @@ void before() {
synchronized boolean complete(Object result, Throwable failure, boolean checkArgs) {
if (!completeCalled) {
if (super.complete(result, failure, checkArgs))
future.complete(result, failure, fallback);
future.complete(result, failure, config.fallback);
completeCalled = true;
}

Expand All @@ -161,9 +158,9 @@ synchronized boolean completeOrRetry(Object result, Throwable failure) {
return true;
} catch (Throwable t) {
failure = t;
if (listeners != null)
listeners.handleComplete(null, t, this, false);
future.complete(null, failure, fallback);
if (config != null)
config.handleComplete(null, t, this, false);
future.complete(null, failure, config.fallback);
}
}

Expand Down
123 changes: 20 additions & 103 deletions src/main/java/net/jodah/failsafe/AsyncFailsafe.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,26 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import net.jodah.failsafe.Callables.AsyncCallableWrapper;
import net.jodah.failsafe.Functions.AsyncCallableWrapper;
import net.jodah.failsafe.function.AsyncCallable;
import net.jodah.failsafe.function.AsyncRunnable;
import net.jodah.failsafe.function.BiFunction;
import net.jodah.failsafe.function.CheckedBiFunction;
import net.jodah.failsafe.function.CheckedRunnable;
import net.jodah.failsafe.function.ContextualCallable;
import net.jodah.failsafe.function.ContextualRunnable;
import net.jodah.failsafe.function.Function;
import net.jodah.failsafe.internal.util.Assert;
import net.jodah.failsafe.util.concurrent.Scheduler;

/**
* Performs asynchronous executions according to a {@link RetryPolicy} and {@link CircuitBreaker}.
* Performs asynchronous executions with failures handled according to a configured {@link #with(RetryPolicy) retry
* policy}, {@link #with(CircuitBreaker) circuit breaker} and
* {@link #withFallback(net.jodah.failsafe.function.CheckedBiFunction) fallback}.
*
* @author Jonathan Halterman
* @param <R> listener result type
*/
public class AsyncFailsafe<R> extends AsyncListenerConfig<AsyncFailsafe<R>, R> {
private RetryPolicy retryPolicy;
private CircuitBreaker circuitBreaker;
private BiFunction<R, Throwable, R> fallback;

AsyncFailsafe(SyncFailsafe<R> failsafe, Scheduler scheduler) {
super(scheduler);
this.retryPolicy = failsafe.retryPolicy;
this.circuitBreaker = failsafe.circuitBreaker;
this.listeners = failsafe.listeners;
this.listenerRegistry = failsafe.listenerRegistry;
public class AsyncFailsafe<R> extends AsyncFailsafeConfig<R, AsyncFailsafe<R>> {
AsyncFailsafe(FailsafeConfig<R, ?> config, Scheduler scheduler) {
super(config, scheduler);
}

/**
Expand All @@ -46,7 +38,7 @@ public class AsyncFailsafe<R> extends AsyncListenerConfig<AsyncFailsafe<R>, R> {
public <T> java.util.concurrent.CompletableFuture<T> future(
Callable<java.util.concurrent.CompletableFuture<T>> callable) {
java.util.concurrent.CompletableFuture<T> response = new java.util.concurrent.CompletableFuture<T>();
call(Callables.ofFuture(callable), new FailsafeFuture<T>(response));
call(Functions.asyncOfFuture(callable), new FailsafeFuture<T>(response));
return response;
}

Expand All @@ -62,7 +54,7 @@ public <T> java.util.concurrent.CompletableFuture<T> future(
public <T> java.util.concurrent.CompletableFuture<T> future(
ContextualCallable<java.util.concurrent.CompletableFuture<T>> callable) {
java.util.concurrent.CompletableFuture<T> response = new java.util.concurrent.CompletableFuture<T>();
call(Callables.ofFuture(callable), new FailsafeFuture<T>(response));
call(Functions.asyncOfFuture(callable), new FailsafeFuture<T>(response));
return response;
}

Expand All @@ -79,7 +71,7 @@ public <T> java.util.concurrent.CompletableFuture<T> future(
public <T> java.util.concurrent.CompletableFuture<T> futureAsync(
AsyncCallable<java.util.concurrent.CompletableFuture<T>> callable) {
java.util.concurrent.CompletableFuture<T> response = new java.util.concurrent.CompletableFuture<T>();
call(Callables.ofFuture(callable), new FailsafeFuture<T>(response));
call(Functions.asyncOfFuture(callable), new FailsafeFuture<T>(response));
return response;
}

Expand All @@ -91,7 +83,7 @@ public <T> java.util.concurrent.CompletableFuture<T> futureAsync(
* @throws CircuitBreakerOpenException if a configured circuit breaker is open
*/
public <T> FailsafeFuture<T> get(Callable<T> callable) {
return call(Callables.asyncOf(callable), null);
return call(Functions.asyncOf(callable), null);
}

/**
Expand All @@ -102,7 +94,7 @@ public <T> FailsafeFuture<T> get(Callable<T> callable) {
* @throws CircuitBreakerOpenException if a configured circuit breaker is open
*/
public <T> FailsafeFuture<T> get(ContextualCallable<T> callable) {
return call(Callables.asyncOf(callable), null);
return call(Functions.asyncOf(callable), null);
}

/**
Expand All @@ -114,7 +106,7 @@ public <T> FailsafeFuture<T> get(ContextualCallable<T> callable) {
* @throws CircuitBreakerOpenException if a configured circuit breaker is open
*/
public <T> FailsafeFuture<T> getAsync(AsyncCallable<T> callable) {
return call(Callables.asyncOf(callable), null);
return call(Functions.asyncOf(callable), null);
}

/**
Expand All @@ -125,7 +117,7 @@ public <T> FailsafeFuture<T> getAsync(AsyncCallable<T> callable) {
* @throws CircuitBreakerOpenException if a configured circuit breaker is open
*/
public FailsafeFuture<Void> run(CheckedRunnable runnable) {
return call(Callables.<Void>asyncOf(runnable), null);
return call(Functions.<Void>asyncOf(runnable), null);
}

/**
Expand All @@ -136,7 +128,7 @@ public FailsafeFuture<Void> run(CheckedRunnable runnable) {
* @throws CircuitBreakerOpenException if a configured circuit breaker is open
*/
public FailsafeFuture<Void> run(ContextualRunnable runnable) {
return call(Callables.<Void>asyncOf(runnable), null);
return call(Functions.<Void>asyncOf(runnable), null);
}

/**
Expand All @@ -148,81 +140,7 @@ public FailsafeFuture<Void> run(ContextualRunnable runnable) {
* @throws CircuitBreakerOpenException if a configured circuit breaker is open
*/
public FailsafeFuture<Void> runAsync(AsyncRunnable runnable) {
return call(Callables.<Void>asyncOf(runnable), null);
}

/**
* Configures the {@code circuitBreaker} to be used to control the rate of event execution.
*
* @throws NullPointerException if {@code circuitBreaker} is null
* @throws IllegalStateException if a circuit breaker is already configured
*/
public AsyncFailsafe<R> with(CircuitBreaker circuitBreaker) {
Assert.state(this.circuitBreaker == null, "A circuit breaker has already been configurd");
this.circuitBreaker = Assert.notNull(circuitBreaker, "circuitBreaker");
return this;
}

/**
* Configures the {@code listeners} to be called as execution events occur.
*
* @throws NullPointerException if {@code listeners} is null
*/
@SuppressWarnings("unchecked")
public <T> AsyncFailsafe<T> with(Listeners<T> listeners) {
this.listeners = (Listeners<R>) Assert.notNull(listeners, "listeners");
return (AsyncFailsafe<T>) this;
}

/**
* Configures the {@code retryPolicy} to be used for retrying failed executions.
*
* @throws NullPointerException if {@code retryPolicy} is null
* @throws IllegalStateException if a retry policy is already configured
*/
public AsyncFailsafe<R> with(RetryPolicy retryPolicy) {
Assert.state(this.retryPolicy == RetryPolicy.NEVER, "A retry policy has already been configurd");
this.retryPolicy = Assert.notNull(retryPolicy, "retryPolicy");
return this;
}

/**
* Configures the {@code fallback} action to be executed if execution fails.
*
* @throws NullPointerException if {@code fallback} is null
* @throws IllegalStateException if {@code withFallback} method has already been called
*/
@SuppressWarnings("unchecked")
public AsyncFailsafe<R> withFallback(BiFunction<? extends R, ? extends Throwable, ? extends R> fallback) {
Assert.state(this.fallback == null, "withFallback has already been called");
this.fallback = (BiFunction<R, Throwable, R>) Assert.notNull(fallback, "fallback");
return this;
}

/**
* Configures the {@code fallback} action to be executed if execution fails.
*
* @throws NullPointerException if {@code fallback} is null
* @throws IllegalStateException if {@code withFallback} method has already been called
*/
@SuppressWarnings("unchecked")
public AsyncFailsafe<R> withFallback(Function<? extends Throwable, ? extends R> fallback) {
Assert.state(this.fallback == null, "withFallback has already been called");
this.fallback = (BiFunction<R, Throwable, R>) Callables
.<R, Throwable, R>of((Function<Throwable, R>) Assert.notNull(fallback, "fallback"));
return this;
}

/**
* Configures the {@code fallback} result to be returned if execution fails.
*
* @throws NullPointerException if {@code fallback} is null
* @throws IllegalStateException if {@code withFallback} method has already been called
*/
public AsyncFailsafe<R> withFallback(R fallback) {
Assert.state(this.fallback == null, "withFallback has already been called");
this.fallback = Callables.of(Assert.notNull(fallback, "fallback"));
return this;
return call(Functions.<Void>asyncOf(runnable), null);
}

/**
Expand All @@ -241,19 +159,18 @@ private <T> FailsafeFuture<T> call(AsyncCallableWrapper<T> callable, FailsafeFut
CircuitBreakerOpenException e = new CircuitBreakerOpenException();
if (fallback == null)
throw e;
future.complete(null, e, (BiFunction<T, Throwable, T>) fallback);
future.complete(null, e, (CheckedBiFunction<T, Throwable, T>) fallback);
return future;
}

AsyncExecution execution = new AsyncExecution(callable, retryPolicy, circuitBreaker, scheduler, future,
(ListenerConfig<?, Object>) this, (BiFunction<T, Throwable, T>) fallback);
AsyncExecution execution = new AsyncExecution(callable, scheduler, future, (FailsafeConfig<Object, ?>) this);
callable.inject(execution);

try {
future.setFuture((Future<T>) scheduler.schedule(callable, 0, TimeUnit.MILLISECONDS));
} catch (Throwable t) {
handleComplete(null, t, execution, false);
future.complete(null, t, (BiFunction<T, Throwable, T>) fallback);
future.complete(null, t, (CheckedBiFunction<T, Throwable, T>) fallback);
}

return future;
Expand Down

0 comments on commit 1be3ab5

Please sign in to comment.