Skip to content

Commit

Permalink
Change the internal executeBlocking method to use an Handler<Future<T…
Browse files Browse the repository at this point in the history
…>> instead of Action<T> and remove the related implementation specific Action interface
  • Loading branch information
vietj committed Apr 5, 2018
1 parent 85aa775 commit b8a5ca7
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 54 deletions.
7 changes: 4 additions & 3 deletions src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java
Expand Up @@ -13,6 +13,7 @@

import io.netty.buffer.ByteBuf;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
Expand Down Expand Up @@ -358,10 +359,10 @@ private synchronized void handleEnd() {

private synchronized void doFlush(Handler<AsyncResult<Void>> handler) {
checkClosed();
context.executeBlocking(() -> {
context.executeBlockingInternal((Future<Void> fut) -> {
try {
ch.force(false);
return null;
fut.complete();
} catch (IOException e) {
throw new FileSystemException(e);
}
Expand Down Expand Up @@ -466,7 +467,7 @@ private void checkContext() {

private void doClose(Handler<AsyncResult<Void>> handler) {
ContextInternal handlerContext = vertx.getOrCreateContext();
handlerContext.executeBlocking(res -> {
handlerContext.executeBlockingInternal(res -> {
try {
ch.close();
res.complete(null);
Expand Down
19 changes: 16 additions & 3 deletions src/main/java/io/vertx/core/file/impl/FileSystemImpl.java
Expand Up @@ -12,6 +12,7 @@
package io.vertx.core.file.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
Expand All @@ -21,7 +22,6 @@
import io.vertx.core.file.FileSystemException;
import io.vertx.core.file.FileSystemProps;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.impl.Action;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;

Expand Down Expand Up @@ -823,7 +823,7 @@ public FileSystemProps perform() {
};
}

protected abstract class BlockingAction<T> implements Action<T> {
protected abstract class BlockingAction<T> implements Handler<Future<T>> {

private final Handler<AsyncResult<T>> handler;
protected final ContextInternal context;
Expand All @@ -836,8 +836,21 @@ public BlockingAction(Handler<AsyncResult<T>> handler) {
* Run the blocking action using a thread from the worker pool.
*/
public void run() {
context.executeBlocking(this, handler);
context.executeBlockingInternal(this, handler);
}

@Override
public void handle(Future<T> fut) {
try {
T result = perform();
fut.complete(result);
} catch (Exception e) {
fut.fail(e);
}
}

public abstract T perform();

}

// Visible for testing
Expand Down
19 changes: 0 additions & 19 deletions src/main/java/io/vertx/core/impl/Action.java

This file was deleted.

21 changes: 8 additions & 13 deletions src/main/java/io/vertx/core/impl/ContextImpl.java
Expand Up @@ -221,13 +221,13 @@ public VertxInternal owner() {
}

@Override
public <T> void executeBlocking(Action<T> action, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(action, null, resultHandler, internalBlockingPool.executor(), internalOrderedTasks, internalBlockingPool.metrics());
public <T> void executeBlockingInternal(Handler<Future<T>> action, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(action, resultHandler, internalBlockingPool.executor(), internalOrderedTasks, internalBlockingPool.metrics());
}

@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(null, blockingCodeHandler, resultHandler, workerPool.executor(), ordered ? orderedTasks : null, workerPool.metrics());
executeBlocking(blockingCodeHandler, resultHandler, workerPool.executor(), ordered ? orderedTasks : null, workerPool.metrics());
}

@Override
Expand All @@ -237,10 +237,10 @@ public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<

@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, TaskQueue queue, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(null, blockingCodeHandler, resultHandler, workerPool.executor(), queue, workerPool.metrics());
executeBlocking(blockingCodeHandler, resultHandler, workerPool.executor(), queue, workerPool.metrics());
}

<T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCodeHandler,
<T> void executeBlocking(Handler<Future<T>> blockingCodeHandler,
Handler<AsyncResult<T>> resultHandler,
Executor exec, TaskQueue queue, PoolMetrics metrics) {
Object queueMetric = metrics != null ? metrics.submitted() : null;
Expand All @@ -256,15 +256,10 @@ <T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCodeHandle
}
Future<T> res = Future.future();
try {
if (blockingCodeHandler != null) {
ContextImpl.setContext(this);
blockingCodeHandler.handle(res);
} else {
T result = action.perform();
res.complete(result);
}
ContextImpl.setContext(this);
blockingCodeHandler.handle(res);
} catch (Throwable e) {
res.fail(e);
res.tryFail(e);
} finally {
if (!DISABLE_TIMINGS) {
current.executeEnd();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/impl/ContextInternal.java
Expand Up @@ -59,7 +59,7 @@ static boolean isOnVertxThread() {
/**
* Execute an internal task on the internal blocking ordered executor.
*/
<T> void executeBlocking(Action<T> action, Handler<AsyncResult<T>> resultHandler);
<T> void executeBlockingInternal(Handler<Future<T>> action, Handler<AsyncResult<T>> resultHandler);

/**
* @return the deployment associated with this context or {@code null}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/impl/HAManager.java
Expand Up @@ -339,7 +339,7 @@ private synchronized void checkQuorumWhenAdded(final String nodeID, final long s
} else {
vertx.setTimer(200, tid -> {
// This can block on a monitor so it needs to run as a worker
vertx.executeBlockingInternal(() -> {
vertx.executeBlockingInternal(fut -> {
if (System.currentTimeMillis() - start > 10000) {
log.warn("Timed out waiting for group information to appear");
} else if (!stopped) {
Expand All @@ -352,7 +352,7 @@ private synchronized void checkQuorumWhenAdded(final String nodeID, final long s
ContextImpl.setContext((ContextImpl) context);
}
}
return null;
fut.complete();
}, null);
});
}
Expand Down
7 changes: 2 additions & 5 deletions src/main/java/io/vertx/core/impl/VertxImpl.java
Expand Up @@ -14,9 +14,6 @@
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.AddressResolverGroup;
import io.netty.resolver.DefaultAddressResolverGroup;
import io.netty.resolver.dns.DefaultDnsServerAddressStreamProvider;
import io.netty.resolver.dns.DnsServerAddressStream;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.concurrent.GenericFutureListener;
import io.vertx.core.AsyncResult;
Expand Down Expand Up @@ -706,10 +703,10 @@ public Set<VerticleFactory> verticleFactories() {
}

@Override
public <T> void executeBlockingInternal(Action<T> action, Handler<AsyncResult<T>> resultHandler) {
public <T> void executeBlockingInternal(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) {
ContextImpl context = getOrCreateContext();

context.executeBlocking(action, resultHandler);
context.executeBlockingInternal(blockingCodeHandler, resultHandler);
}

@Override
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/io/vertx/core/impl/VertxInternal.java
Expand Up @@ -15,10 +15,7 @@
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.AddressResolverGroup;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.core.AsyncResult;
import io.vertx.core.Closeable;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.*;
import io.vertx.core.http.impl.HttpServerImpl;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.impl.NetServerImpl;
Expand Down Expand Up @@ -98,7 +95,10 @@ public interface VertxInternal extends Vertx {

File resolveFile(String fileName);

<T> void executeBlockingInternal(Action<T> action, Handler<AsyncResult<T>> resultHandler);
/**
* Like {@link #executeBlocking(Handler, Handler)} but using the internal worker thread pool.
*/
<T> void executeBlockingInternal(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler);

ClusterManager getClusterManager();

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java
Expand Up @@ -61,7 +61,7 @@ public synchronized <T> void executeBlocking(Handler<Future<T>> blockingCodeHand
throw new IllegalStateException("Worker executor closed");
}
ContextImpl context = (ContextImpl) vertx.getOrCreateContext();
context.executeBlocking(null, blockingCodeHandler, asyncResultHandler, pool.executor(), ordered ? context.orderedTasks : null, pool.metrics());
context.executeBlocking(blockingCodeHandler, asyncResultHandler, pool.executor(), ordered ? context.orderedTasks : null, pool.metrics());
}

@Override
Expand Down
5 changes: 3 additions & 2 deletions src/test/java/io/vertx/test/core/MetricsTest.java
Expand Up @@ -939,7 +939,7 @@ public void testThreadPoolMetricsWithInternalExecuteBlocking() {
int count = num * 5;
for (int i = 0; i < count; i++) {
CountDownLatch latch = latches.computeIfAbsent(i / num, k -> new CountDownLatch(num));
v.executeBlockingInternal(() -> {
v.executeBlockingInternal(fut -> {
latch.countDown();
try {
awaitLatch(latch);
Expand All @@ -950,7 +950,8 @@ public void testThreadPoolMetricsWithInternalExecuteBlocking() {
if (metrics.numberOfRunningTasks() > 0) {
hadRunning.set(true);
}
return null; }, ar -> {
fut.complete();
}, ar -> {
if (metrics.numberOfWaitingTasks() > 0) {
hadWaitingQueue.set(true);
}
Expand Down

0 comments on commit b8a5ca7

Please sign in to comment.