Skip to content

Commit

Permalink
util: add CompletableFutures#fromCompletableFuture method
Browse files Browse the repository at this point in the history
Motivation:
for smooth migration from guava's ListenableFuture to java8's
CompletableFuture there will be a time period where both will be used in
parallel.

Modification:
add CompletableFutures#fromCompletableFuture method to create a
ListenableFuture from CompletableFuture.

Result:
code that uses CompletableFuture can be mixed with ListenableFuture
based one.

Acked-by: Marina Sahakyan
Target: master
Require-book: no
Require-notes: no
  • Loading branch information
kofemann committed Aug 29, 2019
1 parent b994018 commit f524ed7
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 2 deletions.
@@ -1,6 +1,6 @@
/* dCache - http://www.dcache.org/
*
* Copyright (C) 2018 Deutsches Elektronen-Synchrotron
* Copyright (C) 2018 - 2019 Deutsches Elektronen-Synchrotron
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
Expand All @@ -17,23 +17,66 @@
*/
package org.dcache.util;

import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

/**
* Helper class to handle CompletableFuture.
*/
public class CompletableFutures {


private CompletableFutures() {
// no instance allowed
}


/**
* A ListenableFuture that is wrapped around CompletableFuture.
* @param <T> The result type returned by this Future's {@code get} method.
*/
private static class ListenableFutureImpl<T> extends AbstractFuture<T>
implements ListenableFuture<T>, BiConsumer<T, Throwable> {

private final CompletableFuture<T> inner;

public ListenableFutureImpl(CompletableFuture<T> inner) {
this.inner = inner;
inner.whenComplete(this);
}

@Override
public void accept(T value, Throwable throwable) {
if (throwable != null) {
if (throwable instanceof CancellationException) {
// interruption flag is not propagated. just be on the safe side...
cancel(false);
} else {
setException(throwable);
}
} else {
set(value);
}
}
}

/**
* Create a ListenableFuture from java 8 CompletableFuture.
*
* @param completable ListenableFuture to convert.
* @return new ListenableFuture.
* @param <T> The result type returned by this Future's {@code get} method
*/
public static <T> ListenableFuture<T> fromCompletableFuture(CompletableFuture<T> completable) {
return new ListenableFutureImpl<>(completable);
}

/**
* Create a CompletableFuture from guava's ListenableFuture to
* help migration from Guava to Java8.
Expand Down
@@ -0,0 +1,139 @@
package org.dcache.util;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.junit.Test;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class CompletableFuturesTest {

@Test
public void shouldNotBeDoneIfCompletableNotDone() {

CompletableFuture<Void> completable = new CompletableFuture<>();

ListenableFuture<Void> listenable = CompletableFutures.fromCompletableFuture(completable);
assertFalse(listenable.isDone());
}

@Test
public void shouldNotBeDoneIfListenableFutureNotDone() {

ListenableFuture<Void> listenable = SettableFuture.create();
CompletableFuture<Void> completable = CompletableFutures.fromListenableFuture(listenable);

assertFalse(completable.isDone());
}

@Test
public void shouldBeDoneWhenCompletableIsDone() {

CompletableFuture<Void> completable = CompletableFuture.completedFuture(null);

ListenableFuture<Void> listenable = CompletableFutures.fromCompletableFuture(completable);
assertTrue(listenable.isDone());
}

@Test
public void shouldBeDoneWhenListenableFutureIsDone() {

ListenableFuture<Void> listenable = Futures.immediateFuture(null);
CompletableFuture<Void> completable = CompletableFutures.fromListenableFuture(listenable);

assertTrue(completable.isDone());
}

@Test
public void shouldCompleteWhenCompletableIsDone() {

CompletableFuture<Void> completable = new CompletableFuture<>();

ListenableFuture<Void> listenable = CompletableFutures.fromCompletableFuture(completable);
completable.complete(null);
assertTrue(listenable.isDone());
}

@Test
public void shouldCompleteWhenListenableIsDone() {

SettableFuture<Void> listenable = SettableFuture.create();
CompletableFuture<Void> completable = CompletableFutures.fromListenableFuture(listenable);

listenable.set(null);
assertTrue(completable.isDone());
}

// requires Java9+
// @Test(expected = ExecutionException.class)
// public void shouldFailWhenCompletableIsFailed() throws InterruptedException, ExecutionException {
//
// CompletableFuture<Void> completable = CompletableFuture.failedFuture(new IOException());
//
// ListenableFuture<Void> listenable = CompletableFutures.fromCompletableFuture(completable);
// assertTrue(listenable.isDone());
// listenable.get();
// }

@Test(expected = ExecutionException.class)
public void shouldFailWhenCompletableIsFailed() throws InterruptedException, ExecutionException {

CompletableFuture<Void> completable = new CompletableFuture<>();
ListenableFuture<Void> listenable = CompletableFutures.fromCompletableFuture(completable);

completable.completeExceptionally(new IOException());

assertTrue(listenable.isDone());
listenable.get();
}

@Test(expected = ExecutionException.class)
public void shouldFailWhenListenableIsFailed() throws InterruptedException, ExecutionException {

ListenableFuture<Void> listenable = Futures.immediateFailedFuture(new IOException());
CompletableFuture<Void> completable = CompletableFutures.fromListenableFuture(listenable);

assertTrue(completable.isDone());
completable.get();
}

@Test(expected = ExecutionException.class)
public void shouldFailWhenListenableCompletesExceptionally() throws InterruptedException, ExecutionException {

SettableFuture<Void> listenable = SettableFuture.create();
CompletableFuture<Void> completable = CompletableFutures.fromListenableFuture(listenable);

listenable.setException(new IOException());
assertTrue(completable.isDone());
completable.get();
}

@Test
public void shouldCancelWhenCompletableIsCanceled() throws InterruptedException, ExecutionException {

CompletableFuture<Void> completable = new CompletableFuture<>();
ListenableFuture<Void> listenable = CompletableFutures.fromCompletableFuture(completable);

completable.cancel(true);
assertTrue(listenable.isDone());
assertTrue(listenable.isCancelled());
}

@Test
public void shouldCancelWhenListenableIsCanceled() throws InterruptedException, ExecutionException {

SettableFuture<Void> listenable = SettableFuture.create();
CompletableFuture<Void> completable = CompletableFutures.fromListenableFuture(listenable);

listenable.cancel(true);
assertTrue(completable.isDone());
assertTrue(completable.isCancelled());
}

}

0 comments on commit f524ed7

Please sign in to comment.