Skip to content

Commit

Permalink
Some enhancements for Futures.transform and Futures.catching
Browse files Browse the repository at this point in the history
 * extract a template method and 2 subclasses from CatchingFuture and ChainingListenableFuture.  This will allow us to eliminate asAsyncFunction() and 2 allocations per transform
 * make CatchingFuture implement Runnable and save a Runnable allocation
 * have CatchingFuture/ChainingListenableFuture clear all their fields on cancel, which should prevent pinning the transformers
 * have rejectionPropagatingRunnable short circuit for directExecutor() which should save 2 allocations for users who prefer to pass directExecutor() explicitly
 * have Futures.catching use the rejected execution exception logic when it makes sense

#1995
#2013
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=94954378
  • Loading branch information
lukesandberg authored and cpovirk committed Jun 2, 2015
1 parent 93f2646 commit b1bec6d
Show file tree
Hide file tree
Showing 4 changed files with 394 additions and 207 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;


Expand Down Expand Up @@ -387,53 +386,61 @@ public ListenableFuture<V> apply(Throwable t) throws Exception {
}; };
} }


static class CatchingFuture<V, X extends Throwable> extends AbstractFuture.TrustedFuture<V> { private abstract static class AbstractCatchingFuture<V, X extends Throwable, F>
ListenableFuture<? extends V> running; extends AbstractFuture.TrustedFuture<V> implements Runnable {

@Nullable ListenableFuture<? extends V> running;
CatchingFuture(ListenableFuture<? extends V> input, @Nullable Class<X> exceptionType;
final Class<X> exceptionType, @Nullable F fallback;
final AsyncFunction<? super X, ? extends V> fallback,
final Executor executor) { AbstractCatchingFuture(
checkNotNull(exceptionType); ListenableFuture<? extends V> input, Class<X> exceptionType, F fallback) {
checkNotNull(fallback); this.exceptionType = checkNotNull(exceptionType);

this.running = checkNotNull(input);
running = input; this.fallback = checkNotNull(fallback);
input.addListener(new Runnable() { }
@Override public void run() {
ListenableFuture<? extends V> localRunning = running; @Override public final void run() {
running = null; ListenableFuture<? extends V> localRunning = running;
if (localRunning == null | isCancelled()) { Class<X> localExceptionType = exceptionType;
return; F localFallback = fallback;
} if (localRunning == null | localExceptionType == null | localFallback == null
Throwable throwable; | isCancelled()) {
try { return;
set(getUninterruptibly(localRunning)); }
return; Throwable throwable;
} catch (ExecutionException e) { try {
throwable = e.getCause(); set(getUninterruptibly(localRunning));
} catch (Throwable e) { // this includes cancellation exception return;
throwable = e; } catch (ExecutionException e) {
} throwable = e.getCause();
try { } catch (Throwable e) { // this includes cancellation exception
if (isInstanceOfThrowableClass(throwable, exceptionType)) { throwable = e;
@SuppressWarnings("unchecked") // verified safe by isInstance }
X castThrowable = (X) throwable; try {
ListenableFuture<? extends V> replacement = fallback.apply(castThrowable); if (isInstanceOfThrowableClass(throwable, localExceptionType)) {
checkNotNull(replacement, "AsyncFunction.apply returned null instead of a Future. " @SuppressWarnings("unchecked") // verified safe by isInstance
+ "Did you mean to return immediateFuture(null)?"); X castThrowable = (X) throwable;
setFuture(replacement); doFallback(localFallback, castThrowable);
} else { } else {
setException(throwable); setException(throwable);
}
} catch (Throwable e) {
setException(e);
}
} }
}, executor); } catch (Throwable e) {
setException(e);
}
}

/** Template method for subtypes to actually run the fallback. */
abstract void doFallback(F fallback, X throwable) throws Exception;

@Override final void done() {
this.running = null;
this.exceptionType = null;
this.fallback = null;
} }


@Override @Override
public boolean cancel(boolean mayInterruptIfRunning) { public final boolean cancel(boolean mayInterruptIfRunning) {
// we need to read this field prior to calling super.cancel() because cancel will null it out
ListenableFuture<?> current = this.running; ListenableFuture<?> current = this.running;
if (super.cancel(mayInterruptIfRunning)) { if (super.cancel(mayInterruptIfRunning)) {
// May be null if the original future completed, but we were cancelled while the fallback // May be null if the original future completed, but we were cancelled while the fallback
Expand All @@ -449,6 +456,44 @@ public boolean cancel(boolean mayInterruptIfRunning) {
} }
} }


/**
* A {@link AbstractCatchingFuture} that delegates to an {@link AsyncFunction}
* and {@link #setFuture(ListenableFuture)} to implement {@link #doFallback}
*/
static final class AsyncCatchingFuture<V, X extends Throwable>
extends AbstractCatchingFuture<V, X, AsyncFunction<? super X, ? extends V>> {

AsyncCatchingFuture(ListenableFuture<? extends V> input, Class<X> exceptionType,
AsyncFunction<? super X, ? extends V> fallback) {
super(input, exceptionType, fallback);
}

@Override void doFallback(
AsyncFunction<? super X, ? extends V> fallback, X cause) throws Exception {
ListenableFuture<? extends V> replacement = fallback.apply(cause);
checkNotNull(replacement, "AsyncFunction.apply returned null instead of a Future. "
+ "Did you mean to return immediateFuture(null)?");
setFuture(replacement);
}
}

/**
* A {@link AbstractCatchingFuture} that delegates to a {@link Function}
* and {@link #set(Object)} to implement {@link #doFallback}
*/
static final class CatchingFuture<V, X extends Throwable>
extends AbstractCatchingFuture<V, X, Function<? super X, ? extends V>> {
CatchingFuture(ListenableFuture<? extends V> input, Class<X> exceptionType,
Function<? super X, ? extends V> fallback) {
super(input, exceptionType, fallback);
}

@Override void doFallback(Function<? super X, ? extends V> fallback, X cause) throws Exception {
V replacement = fallback.apply(cause);
set(replacement);
}
}

/** /**
* Future that delegates to another but will finish early (via a {@link * Future that delegates to another but will finish early (via a {@link
* TimeoutException} wrapped in an {@link ExecutionException}) if the * TimeoutException} wrapped in an {@link ExecutionException}) if the
Expand Down Expand Up @@ -476,8 +521,8 @@ private static final class TimeoutFuture<V> extends AbstractFuture.TrustedFuture
// currently believed to be a purely theoretical problem (since the other actions should // currently believed to be a purely theoretical problem (since the other actions should
// supply sufficient write-barriers). // supply sufficient write-barriers).


ListenableFuture<V> delegateRef; @Nullable ListenableFuture<V> delegateRef;
Future<?> timer; @Nullable Future<?> timer;


TimeoutFuture(ListenableFuture<V> delegate) { TimeoutFuture(ListenableFuture<V> delegate) {
this.delegateRef = Preconditions.checkNotNull(delegate); this.delegateRef = Preconditions.checkNotNull(delegate);
Expand All @@ -489,7 +534,7 @@ private static final class Fire<V> implements Runnable {
// this weren't a static nested class) could cause retention of the // this weren't a static nested class) could cause retention of the
// delegate's return value (in AbstractFuture) for the duration of the // delegate's return value (in AbstractFuture) for the duration of the
// timeout in the case of successful completion. We clear this on run. // timeout in the case of successful completion. We clear this on run.
TimeoutFuture<V> timeoutFutureRef; @Nullable TimeoutFuture<V> timeoutFutureRef;


Fire(TimeoutFuture<V> timeoutFuture) { Fire(TimeoutFuture<V> timeoutFuture) {
this.timeoutFutureRef = timeoutFuture; this.timeoutFutureRef = timeoutFuture;
Expand Down Expand Up @@ -731,7 +776,7 @@ public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
*/ */
public static <I, O> ListenableFuture<O> transformAsync( public static <I, O> ListenableFuture<O> transformAsync(
ListenableFuture<I> input, AsyncFunction<? super I, ? extends O> function) { ListenableFuture<I> input, AsyncFunction<? super I, ? extends O> function) {
ChainingListenableFuture<I, O> output = new ChainingListenableFuture<I, O>(function, input); AsyncChainingFuture<I, O> output = new AsyncChainingFuture<I, O>(input, function);
input.addListener(output, directExecutor()); input.addListener(output, directExecutor());
return output; return output;
} }
Expand Down Expand Up @@ -776,33 +821,38 @@ public static <I, O> ListenableFuture<O> transformAsync(
public static <I, O> ListenableFuture<O> transformAsync(ListenableFuture<I> input, public static <I, O> ListenableFuture<O> transformAsync(ListenableFuture<I> input,
AsyncFunction<? super I, ? extends O> function, Executor executor) { AsyncFunction<? super I, ? extends O> function, Executor executor) {
checkNotNull(executor); checkNotNull(executor);
ChainingListenableFuture<I, O> output = new ChainingListenableFuture<I, O>(function, input); AsyncChainingFuture<I, O> output = new AsyncChainingFuture<I, O>(input, function);
input.addListener(rejectionPropagatingRunnable(output, output, executor), directExecutor()); input.addListener(output, rejectionPropagatingExecutor(executor, output));
return output; return output;
} }


/** /**
* Returns a Runnable that will invoke the delegate Runnable on the delegate executor, but if the * Returns an Executor that will propagate {@link RejectedExecutionException} from the delegate
* task is rejected, it will propagate that rejection to the output future. * executor to the given {@code future}.
*
* <p>Note, the returned executor can only be used once.
*/ */
private static Runnable rejectionPropagatingRunnable( private static Executor rejectionPropagatingExecutor(
final AbstractFuture<?> outputFuture, final Executor delegate, final AbstractFuture<?> future) {
final Runnable delegateTask, checkNotNull(delegate);
final Executor delegateExecutor) { if (delegate == directExecutor()) {
return new Runnable() { // directExecutor() cannot throw RejectedExecutionException
@Override public void run() { return delegate;
final AtomicBoolean thrownFromDelegate = new AtomicBoolean(true); }
return new Executor() {
volatile boolean thrownFromDelegate = true;
@Override public void execute(final Runnable command) {
try { try {
delegateExecutor.execute(new Runnable() { delegate.execute(new Runnable() {
@Override public void run() { @Override public void run() {
thrownFromDelegate.set(false); thrownFromDelegate = false;
delegateTask.run(); command.run();
} }
}); });
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
if (thrownFromDelegate.get()) { if (thrownFromDelegate) {
// wrap exception? // wrap exception?
outputFuture.setException(e); future.setException(e);
} }
// otherwise it must have been thrown from a transitive call and the delegate runnable // otherwise it must have been thrown from a transitive call and the delegate runnable
// should have handled it. // should have handled it.
Expand Down Expand Up @@ -867,8 +917,7 @@ private static Runnable rejectionPropagatingRunnable(
public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
final Function<? super I, ? extends O> function) { final Function<? super I, ? extends O> function) {
checkNotNull(function); checkNotNull(function);
ChainingListenableFuture<I, O> output = ChainingFuture<I, O> output = new ChainingFuture<I, O>(input, function);
new ChainingListenableFuture<I, O>(asAsyncFunction(function), input);
input.addListener(output, directExecutor()); input.addListener(output, directExecutor());
return output; return output;
} }
Expand Down Expand Up @@ -913,19 +962,9 @@ public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
final Function<? super I, ? extends O> function, Executor executor) { final Function<? super I, ? extends O> function, Executor executor) {
checkNotNull(function); checkNotNull(function);
return transformAsync(input, asAsyncFunction(function), executor); ChainingFuture<I, O> output = new ChainingFuture<I, O>(input, function);
} input.addListener(output, rejectionPropagatingExecutor(executor, output));

return output;
/** Wraps the given function as an AsyncFunction. */
static <I, O> AsyncFunction<I, O> asAsyncFunction(
final Function<? super I, ? extends O> function) {
checkNotNull(function);
return new AsyncFunction<I, O>() {
@Override public ListenableFuture<O> apply(I input) {
O output = function.apply(input);
return immediateFuture(output);
}
};
} }


/** /**
Expand All @@ -948,29 +987,27 @@ static <I, O> AsyncFunction<I, O> asAsyncFunction(
* href="http://code.google.com/p/guava-libraries/issues/detail?id=1548">we * href="http://code.google.com/p/guava-libraries/issues/detail?id=1548">we
* should remove the {@code UndeclaredThrowableException} special case</a>. * should remove the {@code UndeclaredThrowableException} special case</a>.
*/ */
private static final class ChainingListenableFuture<I, O> private abstract static class AbstractChainingFuture<I, O, F>
extends AbstractFuture.TrustedFuture<O> implements Runnable { extends AbstractFuture.TrustedFuture<O> implements Runnable {

private AsyncFunction<? super I, ? extends O> function;
// In theory, this field might not be visible to a cancel() call in certain circumstances. For // In theory, this field might not be visible to a cancel() call in certain circumstances. For
// details, see the comments on the fields of TimeoutFuture. // details, see the comments on the fields of TimeoutFuture.
private ListenableFuture<? extends I> inputFuture; @Nullable ListenableFuture<? extends I> inputFuture;
@Nullable F function;


private ChainingListenableFuture( AbstractChainingFuture(ListenableFuture<? extends I> inputFuture, F function) {
AsyncFunction<? super I, ? extends O> function,
ListenableFuture<? extends I> inputFuture) {
this.function = checkNotNull(function);
this.inputFuture = checkNotNull(inputFuture); this.inputFuture = checkNotNull(inputFuture);
this.function = checkNotNull(function);
} }


@Override @Override
public boolean cancel(boolean mayInterruptIfRunning) { public final boolean cancel(boolean mayInterruptIfRunning) {
/* /*
* Our additional cancellation work needs to occur even if * Our additional cancellation work needs to occur even if
* !mayInterruptIfRunning, so we can't move it into interruptTask(). * !mayInterruptIfRunning, so we can't move it into interruptTask().
*/ */
// we need to read this field prior to calling cancel() because cancel will null it out
ListenableFuture<? extends I> localInputFuture = inputFuture;
if (super.cancel(mayInterruptIfRunning)) { if (super.cancel(mayInterruptIfRunning)) {
ListenableFuture<? extends I> localInputFuture = inputFuture;
if (localInputFuture != null) { if (localInputFuture != null) {
localInputFuture.cancel(mayInterruptIfRunning); localInputFuture.cancel(mayInterruptIfRunning);
} }
Expand All @@ -980,9 +1017,11 @@ public boolean cancel(boolean mayInterruptIfRunning) {
} }


@Override @Override
public void run() { public final void run() {
try { try {
if (isCancelled()) { ListenableFuture<? extends I> localInputFuture = inputFuture;
F localFunction = function;
if (isCancelled() | localInputFuture == null | localFunction == null) {
return; return;
} }
I sourceResult; I sourceResult;
Expand All @@ -999,24 +1038,63 @@ public void run() {
setException(e.getCause()); setException(e.getCause());
return; return;
} }

doTransform(localFunction, sourceResult);
ListenableFuture<? extends O> outputFuture = function.apply(sourceResult);
checkNotNull(outputFuture, "AsyncFunction.apply returned null instead of a Future. "
+ "Did you mean to return immediateFuture(null)?");
setFuture(outputFuture);
} catch (UndeclaredThrowableException e) { } catch (UndeclaredThrowableException e) {
// Set the cause of the exception as this future's exception // Set the cause of the exception as this future's exception
setException(e.getCause()); setException(e.getCause());
} catch (Throwable t) { } catch (Throwable t) {
// This exception is irrelevant in this thread, but useful for the // This exception is irrelevant in this thread, but useful for the
// client // client
setException(t); setException(t);
} finally {
// Don't pin inputs beyond completion
function = null;
inputFuture = null;
} }
} }

/** Template method for subtypes to actually run the transform. */
abstract void doTransform(F function, I result) throws Exception;

@Override final void done() {
this.inputFuture = null;
this.function = null;
}
}

/**
* A {@link AbstractChainingFuture} that delegates to an {@link AsyncFunction} and
* {@link #setFuture(ListenableFuture)} to implement {@link #doTransform}.
*/
private static final class AsyncChainingFuture<I, O>
extends AbstractChainingFuture<I, O, AsyncFunction<? super I, ? extends O>> {
AsyncChainingFuture(ListenableFuture<? extends I> inputFuture,
AsyncFunction<? super I, ? extends O> function) {
super(inputFuture, function);
}

@Override
void doTransform(AsyncFunction<? super I, ? extends O> function, I input) throws Exception {
ListenableFuture<? extends O> outputFuture = function.apply(input);
checkNotNull(outputFuture, "AsyncFunction.apply returned null instead of a Future. "
+ "Did you mean to return immediateFuture(null)?");
setFuture(outputFuture);
}
}

/**
* A {@link AbstractChainingFuture} that delegates to a {@link Function} and
* {@link #set(Object)} to implement {@link #doTransform}.
*/
private static final class ChainingFuture<I, O>
extends AbstractChainingFuture<I, O, Function<? super I, ? extends O>> {

ChainingFuture(ListenableFuture<? extends I> inputFuture,
Function<? super I, ? extends O> function) {
super(inputFuture, function);
}

@Override
void doTransform(Function<? super I, ? extends O> function, I input) {
// TODO(lukes): move the UndeclaredThrowable catch block here?
set(function.apply(input));
}
} }


/** /**
Expand Down
Loading

0 comments on commit b1bec6d

Please sign in to comment.