Skip to content

Commit

Permalink
CompositeFutureImpl's all and any methods silently fail when composin…
Browse files Browse the repository at this point in the history
…g >32 Futures - fixes #1387
  • Loading branch information
vietj committed Apr 23, 2016
1 parent 8d27f61 commit e91a7a4
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 13 deletions.
20 changes: 10 additions & 10 deletions src/main/java/io/vertx/core/impl/CompositeFutureImpl.java
Expand Up @@ -28,14 +28,14 @@ public class CompositeFutureImpl implements CompositeFuture, Handler<AsyncResult

public static CompositeFuture all(Future<?>... results) {
CompositeFutureImpl composite = new CompositeFutureImpl(results);
for (int i = 0;i < results.length;i++) {
int index = i;
int len = results.length;
for (int i = 0; i < len; i++) {
results[i].setHandler(ar -> {
Handler<AsyncResult<CompositeFuture>> handler = null;
if (ar.succeeded()) {
synchronized (composite) {
composite.flag |= 1 << index;
if (!composite.isComplete() && composite.flag == (1 << results.length) - 1) {
composite.count++;
if (!composite.isComplete() && composite.count == len) {
handler = composite.setSucceeded();
}
}
Expand All @@ -51,16 +51,16 @@ public static CompositeFuture all(Future<?>... results) {
}
});
}
if (results.length == 0) {
if (len == 0) {
composite.setSucceeded();
}
return composite;
}

public static CompositeFuture any(Future<?>... results) {
CompositeFutureImpl composite = new CompositeFutureImpl(results);
for (int i = 0;i < results.length;i++) {
int index = i;
int len = results.length;
for (int i = 0;i < len;i++) {
results[i].setHandler(ar -> {
Handler<AsyncResult<CompositeFuture>> handler = null;
if (ar.succeeded()) {
Expand All @@ -71,8 +71,8 @@ public static CompositeFuture any(Future<?>... results) {
}
} else {
synchronized (composite) {
composite.flag |= 1 << index;
if (!composite.isComplete() && composite.flag == (1 << results.length) - 1) {
composite.count++;
if (!composite.isComplete() && composite.count == len) {
handler = composite.setFailed(ar.cause());
}
}
Expand All @@ -89,7 +89,7 @@ public static CompositeFuture any(Future<?>... results) {
}

private final Future[] results;
private int flag;
private int count;
private boolean completed;
private Throwable cause;
private Handler<AsyncResult<CompositeFuture>> handler;
Expand Down
78 changes: 75 additions & 3 deletions src/test/java/io/vertx/test/core/FutureTest.java
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -241,6 +242,41 @@ private void testAllFailed(BiFunction<Future<String>, Future<Integer>, Composite
assertEquals(null, composite.<Integer>result(1));
}

@Test
public void testAllLargeList() {
testAllLargeList(63);
testAllLargeList(64);
testAllLargeList(65);
testAllLargeList(100);
}

private void testAllLargeList(int size) {
List<Future> list = new ArrayList<>();
for (int i = 0;i < size;i++) {
list.add(Future.succeededFuture());
}
CompositeFuture composite = CompositeFuture.all(list);
Checker<CompositeFuture> checker = new Checker<>(composite);
checker.assertSucceeded(composite);
for (int i = 0;i < size;i++) {
list.clear();
Throwable cause = new Exception();
for (int j = 0;j < size;j++) {
list.add(i == j ? Future.failedFuture(cause) : Future.succeededFuture());
}
composite = CompositeFuture.all(list);
checker = new Checker<>(composite);
checker.assertFailed(cause);
for (int j = 0;j < size;j++) {
if (i == j) {
assertTrue(composite.failed(j));
} else {
assertTrue(composite.succeeded(j));
}
}
}
}

@Test
public void testAnySucceeded1() {
testAnySucceeded1(CompositeFuture::any);
Expand Down Expand Up @@ -314,6 +350,40 @@ private void testAnyFailed(BiFunction<Future<String>, Future<Integer>, Composite
checker.assertFailed(cause);
}

@Test
public void testAnyLargeList() {
testAnyLargeList(63);
testAnyLargeList(64);
testAnyLargeList(65);
testAnyLargeList(100);
}

private void testAnyLargeList(int size) {
List<Future> list = new ArrayList<>();
for (int i = 0;i < size;i++) {
list.add(Future.failedFuture(new Exception()));
}
CompositeFuture composite = CompositeFuture.any(list);
Checker<CompositeFuture> checker = new Checker<>(composite);
checker.assertFailed();
for (int i = 0;i < size;i++) {
list.clear();
for (int j = 0;j < size;j++) {
list.add(i == j ? Future.succeededFuture() : Future.failedFuture(new RuntimeException()));
}
composite = CompositeFuture.any(list);
checker = new Checker<>(composite);
checker.assertSucceeded(composite);
for (int j = 0;j < size;j++) {
if (i == j) {
assertTrue(composite.succeeded(j));
} else {
assertTrue(composite.failed(j));
}
}
}
}

@Test
public void testCompositeFutureToList() {
Future<String> f1 = Future.future();
Expand Down Expand Up @@ -514,20 +584,22 @@ void assertSucceeded(T expected) {
}

void assertFailed(Throwable expected) {
assertEquals(expected, assertFailed());
}

Throwable assertFailed() {
assertTrue(future.isComplete());
assertFalse(future.succeeded());
assertTrue(future.failed());
assertEquals(expected, future.cause());
assertEquals(null, future.result());
assertEquals(1, count.get());
AsyncResult<T> ar = result.get();
assertNotNull(ar);
assertFalse(ar.succeeded());
assertTrue(ar.failed());
assertNull(ar.result());
assertEquals(expected, future.cause());
return future.cause();
}

}

/*
Expand Down

6 comments on commit e91a7a4

@dtolstyi
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job! Hope to see this feature soon in a new release.

@vietj
Copy link
Member Author

@vietj vietj commented on e91a7a4 Apr 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be in a 3.3

@dtolstyi
Copy link

@dtolstyi dtolstyi commented on e91a7a4 Apr 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @vietj.
Thank you for your response.
Are there any planned dates for 3.3? I'm thinking if I need to wait for 3.3 to fix this issue in my product or to replace faulty code with Observable as you've recommended.

@vietj
Copy link
Member Author

@vietj vietj commented on e91a7a4 Apr 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is planned for June. You could just fork the CompositeFutureImpl in your code in the meantime.

@vietj
Copy link
Member Author

@vietj vietj commented on e91a7a4 Apr 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or use Observable if you have more complex needs

@dtolstyi
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for information, @vietj.

Please sign in to comment.