Skip to content

Commit

Permalink
feat: implement new Cffu.cffuJoin method support timeout join
Browse files Browse the repository at this point in the history
  • Loading branch information
oldratlee committed Mar 19, 2023
1 parent a259321 commit 323e14a
Showing 1 changed file with 87 additions and 7 deletions.
94 changes: 87 additions & 7 deletions src/main/java/io/foldright/cffu/Cffu.java
Original file line number Diff line number Diff line change
Expand Up @@ -617,9 +617,14 @@ public Cffu<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor
public Cffu<T> orTimeout(long timeout, TimeUnit unit) {
if (isMinimalStage) throw new UnsupportedOperationException("unsupported because this a minimal stage");

orTimeoutCf0(cf, timeout, unit);
return this;
}

private static <U> void orTimeoutCf0(CompletableFuture<U> cf, long timeout, TimeUnit unit) {
if (IS_JAVA9_PLUS) {
cf.orTimeout(timeout, unit);
return this;
return;
}

// below code is copied from CompletableFuture#orTimeout with small adoption
Expand All @@ -629,7 +634,6 @@ public Cffu<T> orTimeout(long timeout, TimeUnit unit) {
ScheduledFuture<?> f = Delayer.delayToTimoutCf(cf, timeout, unit);
cf.whenComplete(new FutureCanceller(f));
}
return this;
}

/**
Expand Down Expand Up @@ -933,9 +937,10 @@ public <U> Cffu<U> handleAsync(
////////////////////////////////////////////////////////////////////////////////
//# Read(explicitly) methods
//
// - get()
// - get(timeout, unit)
// - join()
// - get() // BLOCKING
// - get(timeout, unit) // BLOCKING
// - join() // BLOCKING
// - cffuJoin() // BLOCKING
// - getNow(T valueIfAbsent)
// - resultNow()
// - exceptionNow()
Expand All @@ -959,6 +964,11 @@ public <U> Cffu<U> handleAsync(
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread was interrupted while waiting
* @see #join()
* @see #cffuJoin(long, TimeUnit)
* @see #getNow(Object)
* @see #resultNow()
* @see #get(long, TimeUnit)
*/
@Blocking
@Nullable
Expand All @@ -980,6 +990,11 @@ public T get() throws InterruptedException, ExecutionException {
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread was interrupted while waiting
* @throws TimeoutException if the wait timed out
* @see #cffuJoin(long, TimeUnit)
* @see #getNow(Object)
* @see #resultNow()
* @see #join()
* @see #get()
*/
@Blocking
@Nullable
Expand All @@ -993,6 +1008,7 @@ public T get(long timeout, TimeUnit unit)

/**
* Returns the result value when complete, or throws an (unchecked) exception if completed exceptionally.
* <p>
* To better conform with the use of common functional forms, if a computation involved in the completion
* of this Cffu threw an exception, this method throws an (unchecked) {@link CompletionException}
* with the underlying exception as its cause.
Expand All @@ -1001,6 +1017,11 @@ public T get(long timeout, TimeUnit unit)
* @throws CancellationException if the computation was cancelled
* @throws CompletionException if this future completed exceptionally
* or a completion computation threw an exception
* @see #cffuJoin(long, TimeUnit)
* @see #getNow(Object)
* @see #resultNow()
* @see #get(long, TimeUnit)
* @see #get()
*/
@Blocking
@Nullable
Expand All @@ -1010,6 +1031,52 @@ public T join() {
return cf.join();
}

/**
* Waits if necessary for at most the given time for the computation to complete,
* and then retrieves its result value when complete, or throws an (unchecked) exception if completed exceptionally.
* <p>
* <b><i>NOTE:<br></i></b>
* call this method
* <p>
* {@code result = cffu.cffuJoin(timeout, unit);}
* <p>
* is same as:
* <pre>{@code result = cffu.copy() // defensive copy to avoid write this cffu unexpectedly
* .orTimeout(timeout, unit)
* .join();
* }</pre>
*
* <b><i>CAUTION:<br></i></b>
* if the wait timed out, this method throws an (unchecked) {@link CompletionException}
* with the {@link TimeoutException} as its cause;
* NOT throws a (checked) {@link TimeoutException} like {@link #get(long, TimeUnit)}.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the result value
* @throws CancellationException if the computation was cancelled
* @throws CompletionException if this future completed exceptionally
* or a completion computation threw an exception
* or the wait timed out(with the {@code TimeoutException} as its cause)
* @see #join()
* @see #getNow(Object)
* @see #resultNow()
* @see #get(long, TimeUnit)
* @see #get()
* @see #orTimeout(long, TimeUnit)
*/
@Blocking
@Nullable
public T cffuJoin(long timeout, TimeUnit unit) {
if (isMinimalStage) throw new UnsupportedOperationException("unsupported because this a minimal stage");

if (cf.isDone()) return cf.join();

CompletableFuture<T> f = copyCf0(cf);
orTimeoutCf0(f, timeout, unit);
return f.join();
}

/**
* Returns the result value (or throws any encountered exception) if completed,
* else returns the given valueIfAbsent.
Expand All @@ -1019,6 +1086,11 @@ public T join() {
* @throws CancellationException if the computation was cancelled
* @throws CompletionException if this future completed exceptionally
* or a completion computation threw an exception
* @see #resultNow()
* @see #cffuJoin(long, TimeUnit)
* @see #join()
* @see #get(long, TimeUnit)
* @see #get()
*/
@Nullable
public T getNow(T valueIfAbsent) {
Expand All @@ -1038,6 +1110,8 @@ public T getNow(T valueIfAbsent) {
* .map(Future::resultNow)
* .toList();
* }</pre>
*
* @see #getNow(Object)
*/
@Nullable
@Override
Expand Down Expand Up @@ -1077,6 +1151,7 @@ public T resultNow() {
* @return the exception thrown by the task
* @throws IllegalStateException if the task has not completed, the task completed normally,
* or the task was cancelled
* @see #resultNow()
*/
@Override
public Throwable exceptionNow() {
Expand Down Expand Up @@ -1344,10 +1419,15 @@ public CompletableFuture<T> toCompletableFuture() {
*/
@Contract(pure = true)
public Cffu<T> copy() {
return reset(copyCf0(cf));
}

@Contract(pure = true)
private static <U> CompletableFuture<U> copyCf0(CompletableFuture<U> cf) {
if (IS_JAVA9_PLUS) {
return reset(cf.copy());
return cf.copy();
}
return thenApply(Function.identity());
return cf.thenApply(Function.identity());
}

/**
Expand Down

0 comments on commit 323e14a

Please sign in to comment.