Skip to content

Commit

Permalink
Any future aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Jan 14, 2016
1 parent b97b574 commit f250680
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 30 deletions.
37 changes: 32 additions & 5 deletions src/main/java/io/vertx/core/CompositeFuture.java
Expand Up @@ -33,23 +33,50 @@ public interface CompositeFuture extends Future<CompositeFuture> {
* @return the composite future * @return the composite future
*/ */
static <T1, T2> CompositeFuture all(Future<T1> f1, Future<T2> f2) { static <T1, T2> CompositeFuture all(Future<T1> f1, Future<T2> f2) {
return new CompositeFutureImpl(f1, f2); return CompositeFutureImpl.all(f1, f2);
} }


static <T1, T2, T3> CompositeFuture all(Future<T1> f1, Future<T2> f2, Future<T3> f3) { static <T1, T2, T3> CompositeFuture all(Future<T1> f1, Future<T2> f2, Future<T3> f3) {
return new CompositeFutureImpl(f1, f2, f3); return CompositeFutureImpl.all(f1, f2, f3);
} }


static <T1, T2, T3, T4> CompositeFuture all(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4) { static <T1, T2, T3, T4> CompositeFuture all(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4) {
return new CompositeFutureImpl(f1, f2, f3, f4); return CompositeFutureImpl.all(f1, f2, f3, f4);
} }


static <T1, T2, T3, T4, T5> CompositeFuture all(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4, Future<T5> f5) { static <T1, T2, T3, T4, T5> CompositeFuture all(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4, Future<T5> f5) {
return new CompositeFutureImpl(f1, f2, f3, f4, f5); return CompositeFutureImpl.all(f1, f2, f3, f4, f5);
} }


static <T1, T2, T3, T4, T5, T6> CompositeFuture all(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4, Future<T5> f5, Future<T6> f6) { static <T1, T2, T3, T4, T5, T6> CompositeFuture all(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4, Future<T5> f5, Future<T6> f6) {
return new CompositeFutureImpl(f1, f2, f3, f4, f5, f6); return CompositeFutureImpl.all(f1, f2, f3, f4, f5, f6);
}

/**
* Return a composite future, succeeded when any futures is succeeded, failed when all futures are failed.
*
* @param f1 future
* @param f2 future
* @return the composite future
*/
static <T1, T2> CompositeFuture any(Future<T1> f1, Future<T2> f2) {
return CompositeFutureImpl.any(f1, f2);
}

static <T1, T2, T3> CompositeFuture any(Future<T1> f1, Future<T2> f2, Future<T3> f3) {
return CompositeFutureImpl.any(f1, f2, f3);
}

static <T1, T2, T3, T4> CompositeFuture any(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4) {
return CompositeFutureImpl.any(f1, f2, f3, f4);
}

static <T1, T2, T3, T4, T5> CompositeFuture any(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4, Future<T5> f5) {
return CompositeFutureImpl.any(f1, f2, f3, f4, f5);
}

static <T1, T2, T3, T4, T5, T6> CompositeFuture any(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4, Future<T5> f5, Future<T6> f6) {
return CompositeFutureImpl.any(f1, f2, f3, f4, f5, f6);
} }


@Override @Override
Expand Down
41 changes: 32 additions & 9 deletions src/main/java/io/vertx/core/impl/CompositeFutureImpl.java
Expand Up @@ -26,27 +26,50 @@
*/ */
public class CompositeFutureImpl extends FutureImpl<CompositeFuture> implements CompositeFuture { public class CompositeFutureImpl extends FutureImpl<CompositeFuture> implements CompositeFuture {


private final Future[] results; public static CompositeFuture all(Future<?>... results) {
private int flag; CompositeFutureImpl composite = new CompositeFutureImpl(results);

for (int i = 0;i < results.length;i++) {
public CompositeFutureImpl(Future<?>... results) { int index = i;
results[i].setHandler(ar -> {
if (ar.succeeded()) {
composite.flag |= 1 << index;
if (!composite.isComplete() && composite.flag == (1 << results.length) - 1) {
composite.complete(composite);
}
} else {
if (!composite.isComplete()) {
composite.fail(ar.cause());
}
}
});
}
return composite;
}


public static CompositeFuture any(Future<?>... results) {
CompositeFutureImpl composite = new CompositeFutureImpl(results);
for (int i = 0;i < results.length;i++) { for (int i = 0;i < results.length;i++) {
int index = i; int index = i;
results[i].setHandler(ar -> { results[i].setHandler(ar -> {
if (ar.succeeded()) { if (ar.succeeded()) {
flag |= 1 << index; if (!composite.isComplete()) {
if (!isComplete() && flag == (1 << results.length) - 1) { composite.complete(composite);
complete(this);
} }
} else { } else {
if (!isComplete()) { composite.flag |= 1 << index;
fail(ar.cause()); if (!composite.isComplete() && composite.flag == (1 << results.length) - 1) {
composite.fail(ar.cause());
} }
} }
}); });
} }
return composite;
}

private final Future[] results;
private int flag;


private CompositeFutureImpl(Future<?>... results) {
this.results = results; this.results = results;
} }


Expand Down
93 changes: 77 additions & 16 deletions src/test/java/io/vertx/test/core/FutureTest.java
Expand Up @@ -180,33 +180,71 @@ public void testFailFutureToHandler() {
} }


@Test @Test
public void testCompositeFutureSucceeded() { public void testAllSucceeded() {
Future<String> f1 = Future.future(); Future<String> f1 = Future.future();
Future<Integer> f2 = Future.future(); Future<Integer> f2 = Future.future();
CompositeFuture composite = CompositeFuture.all(f1, f2); CompositeFuture composite = CompositeFuture.all(f1, f2);
assertFalse(composite.succeeded()); assertNotCompleted(composite);
assertEquals(null, composite.<String>result(0));
assertEquals(null, composite.<Integer>result(1));
f1.complete("something"); f1.complete("something");
assertFalse(composite.succeeded()); assertNotCompleted(composite);
assertEquals("something", composite.result(0)); assertEquals("something", composite.result(0));
assertEquals(null, composite.<Integer>result(1)); assertEquals(null, composite.<Integer>result(1));
f2.complete(3); f2.complete(3);
assertTrue(composite.succeeded()); assertSucceeded(composite, composite);
assertEquals("something", composite.result(0)); assertEquals("something", composite.result(0));
assertEquals(3, (int)composite.result(1)); assertEquals(3, (int)composite.result(1));
} }


@Test @Test
public void testCompositeFutureFailed() { public void testAllFailed() {
Future<String> f1 = Future.future(); Future<String> f1 = Future.future();
Future<Integer> f2 = Future.future(); Future<Integer> f2 = Future.future();
CompositeFuture composite = CompositeFuture.all(f1, f2); CompositeFuture composite = CompositeFuture.all(f1, f2);
assertFalse(composite.succeeded());
f1.complete("s"); f1.complete("s");
assertFalse(composite.succeeded());
Exception cause = new Exception(); Exception cause = new Exception();
f2.fail(cause); f2.fail(cause);
assertTrue(composite.failed()); assertFailed(composite, cause);
assertEquals(cause, composite.cause()); assertEquals("s", composite.result(0));
assertEquals(null, composite.<Integer>result(1));
}

@Test
public void testAnySucceeded1() {
Future<String> f1 = Future.future();
Future<Integer> f2 = Future.future();
CompositeFuture composite = CompositeFuture.any(f1, f2);
assertNotCompleted(composite);
assertEquals(null, composite.<String>result(0));
assertEquals(null, composite.<Integer>result(1));
f1.complete("something");
assertSucceeded(composite, composite);
f2.complete(3);
assertSucceeded(composite, composite);
}

@Test
public void testAnySucceeded2() {
Future<String> f1 = Future.future();
Future<Integer> f2 = Future.future();
CompositeFuture composite = CompositeFuture.any(f1, f2);
f1.fail("failure");
assertNotCompleted(composite);
f2.complete(3);
assertSucceeded(composite, composite);
}

@Test
public void testAnyFailed() {
Future<String> f1 = Future.future();
Future<Integer> f2 = Future.future();
CompositeFuture composite = CompositeFuture.any(f1, f2);
f1.fail("failure");
assertNotCompleted(composite);
Throwable cause = new Exception();
f2.fail(cause);
assertFailed(composite, cause);
} }


@Test @Test
Expand All @@ -215,16 +253,17 @@ public void testComposeSucceed() {
Future<Integer> f2 = Future.future(); Future<Integer> f2 = Future.future();
f1.compose(string -> f2.complete(string.length()), f2); f1.compose(string -> f2.complete(string.length()), f2);
f1.complete("abcdef"); f1.complete("abcdef");
assertEquals(6, (int)f2.result()); assertSucceeded(f2, 6);
} }


@Test @Test
public void testComposeFail() { public void testComposeFail() {
Future<String> f1 = Future.future(); Future<String> f1 = Future.future();
Future<Integer> f2 = Future.future(); Future<Integer> f2 = Future.future();
f1.compose(string -> f2.complete(string.length()), f2); f1.compose(string -> f2.complete(string.length()), f2);
f1.fail("abcdef"); Exception cause = new Exception();
assertTrue(f2.failed()); f1.fail(cause);
assertFailed(f2, cause);
} }


@Test @Test
Expand All @@ -234,8 +273,7 @@ public void testComposeHandlerFail() {
RuntimeException cause = new RuntimeException(); RuntimeException cause = new RuntimeException();
f1.compose(string -> { throw cause; }, f2); f1.compose(string -> { throw cause; }, f2);
f1.complete("foo"); f1.complete("foo");
assertTrue(f2.failed()); assertFailed(f2, cause);
assertSame(cause, f2.cause());
} }


@Test @Test
Expand All @@ -252,7 +290,30 @@ public void testComposeHandlerFailAfterCompletion() {
} catch (Exception e) { } catch (Exception e) {
assertEquals(cause, e); assertEquals(cause, e);
} }
assertTrue(f2.succeeded()); assertSucceeded(f2, 46);
assertEquals(46, (int)f2.result()); }

private <T> void assertSucceeded(Future<T> future, T expected) {
assertTrue(future.isComplete());
assertTrue(future.succeeded());
assertFalse(future.failed());
assertNull(future.cause());
assertEquals(expected, future.result());
}

private <T> void assertFailed(Future<T> future, Throwable expected) {
assertTrue(future.isComplete());
assertFalse(future.succeeded());
assertTrue(future.failed());
assertEquals(expected, future.cause());
assertEquals(null, future.result());
}

private <T> void assertNotCompleted(Future<T> future) {
assertFalse(future.isComplete());
assertFalse(future.succeeded());
assertFalse(future.failed());
assertNull(future.cause());
assertNull(future.result());
} }
} }

0 comments on commit f250680

Please sign in to comment.