Skip to content

Commit

Permalink
Rework for thread safety
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Jan 15, 2016
1 parent 4a4b1d0 commit 374c599
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 44 deletions.
2 changes: 2 additions & 0 deletions src/main/asciidoc/java/index.adoc
Expand Up @@ -331,6 +331,8 @@ CompositeFuture.any(future1, future2).setHandler(ar -> {
}); });
---- ----


`link:../../apidocs/io/vertx/core/Future.html#compose-io.vertx.core.Handler-io.vertx.core.Future-[compose]` chains futures:

== Verticles == Verticles


Vert.x comes with a simple, scalable, _actor-like_ deployment and concurrency model out of the box that Vert.x comes with a simple, scalable, _actor-like_ deployment and concurrency model out of the box that
Expand Down
17 changes: 16 additions & 1 deletion src/main/java/io/vertx/core/Future.java
Expand Up @@ -147,7 +147,22 @@ static <T> Future<T> failedFuture(String failureMessage) {
* @param handler the handler * @param handler the handler
* @param next the next future * @param next the next future
*/ */
<U> void compose(Handler<T> handler, Future<U> next); default <U> void compose(Handler<T> handler, Future<U> next) {
setHandler(ar -> {
if (ar.succeeded()) {
try {
handler.handle(ar.result());
} catch (Throwable err) {
if (next.isComplete()) {
throw err;
}
next.fail(err);
}
} else {
next.fail(ar.cause());
}
});
}


/** /**
* @return an handler completing this future * @return an handler completing this future
Expand Down
151 changes: 134 additions & 17 deletions src/main/java/io/vertx/core/impl/CompositeFutureImpl.java
Expand Up @@ -24,23 +24,31 @@
/** /**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a> * @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/ */
public class CompositeFutureImpl extends FutureImpl<CompositeFuture> implements CompositeFuture { public class CompositeFutureImpl implements CompositeFuture, Handler<AsyncResult<CompositeFuture>> {


public static CompositeFuture all(Future<?>... results) { public static CompositeFuture all(Future<?>... results) {
CompositeFutureImpl composite = new CompositeFutureImpl(results); CompositeFutureImpl composite = new CompositeFutureImpl(results);
for (int i = 0;i < results.length;i++) { for (int i = 0;i < results.length;i++) {
int index = i; int index = i;
results[i].setHandler(ar -> { results[i].setHandler(ar -> {
Handler<AsyncResult<CompositeFuture>> handler = null;
if (ar.succeeded()) { if (ar.succeeded()) {
composite.flag |= 1 << index; synchronized (composite) {
if (!composite.isComplete() && composite.flag == (1 << results.length) - 1) { composite.flag |= 1 << index;
composite.complete(composite); if (!composite.isComplete() && composite.flag == (1 << results.length) - 1) {
handler = composite.setSucceeded();
}
} }
} else { } else {
if (!composite.isComplete()) { synchronized (composite) {
composite.fail(ar.cause()); if (!composite.isComplete()) {
handler = composite.setFailed(ar.cause());
}
} }
} }
if (handler != null) {
handler.handle(composite);
}
}); });
} }
return composite; return composite;
Expand All @@ -51,63 +59,172 @@ public static CompositeFuture any(Future<?>... results) {
for (int i = 0;i < results.length;i++) { for (int i = 0;i < results.length;i++) {
int index = i; int index = i;
results[i].setHandler(ar -> { results[i].setHandler(ar -> {
Handler<AsyncResult<CompositeFuture>> handler = null;
if (ar.succeeded()) { if (ar.succeeded()) {
if (!composite.isComplete()) { synchronized (composite) {
composite.complete(composite); if (!composite.isComplete()) {
composite.setSucceeded();
}
} }
} else { } else {
composite.flag |= 1 << index; synchronized (composite) {
if (!composite.isComplete() && composite.flag == (1 << results.length) - 1) { composite.flag |= 1 << index;
composite.fail(ar.cause()); if (!composite.isComplete() && composite.flag == (1 << results.length) - 1) {
handler = composite.setFailed(ar.cause());
}
} }
} }
if (handler != null) {
handler.handle(composite);
}
}); });
} }
return composite; return composite;
} }


private final Future[] results; private final Future[] results;
private int flag; private int flag;
private boolean completed;
private Throwable cause;
private Handler<AsyncResult<CompositeFuture>> handler;


private CompositeFutureImpl(Future<?>... results) { private CompositeFutureImpl(Future<?>... results) {
this.results = results; this.results = results;
} }


@Override @Override
public CompositeFuture setHandler(Handler<AsyncResult<CompositeFuture>> handler) { public CompositeFuture setHandler(Handler<AsyncResult<CompositeFuture>> handler) {
return (CompositeFuture) super.setHandler(handler); boolean call;
synchronized (this) {
this.handler = handler;
call = completed;
}
if (call) {
handler.handle(this);
}
return this;
} }


@Override @Override
public Throwable cause(int index) { public Throwable cause(int index) {
return results[index].cause(); return future(index).cause();
} }


@Override @Override
public boolean succeeded(int index) { public boolean succeeded(int index) {
return results[index].succeeded(); return future(index).succeeded();
} }


@Override @Override
public boolean failed(int index) { public boolean failed(int index) {
return results[index].failed(); return future(index).failed();
} }


@Override @Override
public boolean isComplete(int index) { public boolean isComplete(int index) {
return results[index].isComplete(); return future(index).isComplete();
} }


@Override @Override
public <T> T result(int index) { public <T> T result(int index) {
return this.<T>future(index).result();
}

private <T> Future<T> future(int index) {
if (index < 0 || index > results.length) { if (index < 0 || index > results.length) {
throw new IndexOutOfBoundsException(); throw new IndexOutOfBoundsException();
} }
return (T) results[index].result(); return (Future<T>) results[index];
} }


@Override @Override
public int size() { public int size() {
return results.length; return results.length;
} }

@Override
public synchronized boolean isComplete() {
return completed;
}

@Override
public synchronized boolean succeeded() {
return completed && cause == null;
}

@Override
public synchronized boolean failed() {
return completed && cause != null;
}

@Override
public synchronized Throwable cause() {
return completed && cause != null ? cause : null;
}

@Override
public synchronized CompositeFuture result() {
return completed && cause == null ? this : null;
}

@Override
public void complete(CompositeFuture result) {
Handler<AsyncResult<CompositeFuture>> handler = setSucceeded();
if (handler != null) {
handler.handle(this);
}
}

@Override
public void complete() {
complete(null);
}

@Override
public void fail(Throwable throwable) {
Handler<AsyncResult<CompositeFuture>> handler = setFailed(throwable);
if (handler != null) {
handler.handle(this);
}
}

@Override
public void fail(String failureMessage) {
fail(new NoStackTraceThrowable(failureMessage));
}

private Handler<AsyncResult<CompositeFuture>> setFailed(Throwable cause) {
synchronized (this) {
if (completed) {
throw new IllegalStateException("Result is already complete: " + (this.cause == null ? "succeeded" : "failed"));
}
this.completed = true;
this.cause = cause;
return handler;
}
}

private Handler<AsyncResult<CompositeFuture>> setSucceeded() {
synchronized (this) {
if (completed) {
throw new IllegalStateException("Result is already complete: " + (this.cause == null ? "succeeded" : "failed"));
}
this.completed = true;
return handler;
}
}

@Override
public Handler<AsyncResult<CompositeFuture>> handler() {
return this;
}

@Override
public void handle(AsyncResult<CompositeFuture> ar) {
if (ar.succeeded()) {
complete(this);
} else {
fail(ar.cause());
}
}
} }
37 changes: 11 additions & 26 deletions src/main/java/io/vertx/core/impl/FutureImpl.java
Expand Up @@ -20,7 +20,7 @@
import io.vertx.core.Future; import io.vertx.core.Future;
import io.vertx.core.Handler; import io.vertx.core.Handler;


class FutureImpl<T> implements Future<T> { class FutureImpl<T> implements Future<T>, Handler<AsyncResult<T>> {
private boolean failed; private boolean failed;
private boolean succeeded; private boolean succeeded;
private Handler<AsyncResult<T>> handler; private Handler<AsyncResult<T>> handler;
Expand Down Expand Up @@ -126,13 +126,16 @@ public void handle(Future<T> ar) {


@Override @Override
public Handler<AsyncResult<T>> handler() { public Handler<AsyncResult<T>> handler() {
return ar -> { return this;
if (ar.succeeded()) { }
complete(ar.result());
} else { @Override
fail(ar.cause()); public void handle(AsyncResult<T> ar) {
} if (ar.succeeded()) {
}; complete(ar.result());
} else {
fail(ar.cause());
}
} }


/** /**
Expand All @@ -145,24 +148,6 @@ public void fail(Throwable throwable) {
checkCallHandler(); checkCallHandler();
} }


@Override
public <U> void compose(Handler<T> handler, Future<U> next) {
setHandler(ar -> {
if (ar.succeeded()) {
try {
handler.handle(ar.result());
} catch (Throwable err) {
if (next.isComplete()) {
throw err;
}
next.fail(err);
}
} else {
next.fail(ar.cause());
}
});
}

@Override @Override
public void fail(String failureMessage) { public void fail(String failureMessage) {
fail(new NoStackTraceThrowable(failureMessage)); fail(new NoStackTraceThrowable(failureMessage));
Expand Down

0 comments on commit 374c599

Please sign in to comment.