Skip to content

Commit

Permalink
Use correct thread pools
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox committed Mar 26, 2015
1 parent 536c5fd commit bd8c1fd
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java
Expand Up @@ -352,7 +352,7 @@ private synchronized void doFlush(Handler<AsyncResult<Void>> handler) {
} catch (IOException e) { } catch (IOException e) {
throw new FileSystemException(e); throw new FileSystemException(e);
} }
}, handler); }, true, handler);
} }


private void doWrite(ByteBuffer buff, long position, long toWrite, Handler<AsyncResult<Void>> handler) { private void doWrite(ByteBuffer buff, long position, long toWrite, Handler<AsyncResult<Void>> handler) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/file/impl/FileSystemImpl.java
Expand Up @@ -806,7 +806,7 @@ public BlockingAction(Handler<AsyncResult<T>> handler) {
* Run the blocking action using a thread from the worker pool. * Run the blocking action using a thread from the worker pool.
*/ */
public void run() { public void run() {
context.executeBlocking(this, handler); context.executeBlocking(this, true, handler);
} }
} }
} }
16 changes: 10 additions & 6 deletions src/main/java/io/vertx/core/impl/ContextImpl.java
Expand Up @@ -51,11 +51,13 @@ public abstract class ContextImpl implements Context {
private final ClassLoader tccl; private final ClassLoader tccl;
private final EventLoop eventLoop; private final EventLoop eventLoop;
protected final Executor orderedInternalPoolExec; protected final Executor orderedInternalPoolExec;
protected final Executor workerExec;
protected VertxThread contextThread; protected VertxThread contextThread;


protected ContextImpl(VertxInternal vertx, Executor orderedInternalPoolExec, String deploymentID, JsonObject config, protected ContextImpl(VertxInternal vertx, Executor orderedInternalPoolExec, Executor workerExec, String deploymentID, JsonObject config,
ClassLoader tccl) { ClassLoader tccl) {
this.orderedInternalPoolExec = orderedInternalPoolExec; this.orderedInternalPoolExec = orderedInternalPoolExec;
this.workerExec = workerExec;
this.deploymentID = deploymentID; this.deploymentID = deploymentID;
this.config = config; this.config = config;
EventLoopGroup group = vertx.getEventLoopGroup(); EventLoopGroup group = vertx.getEventLoopGroup();
Expand Down Expand Up @@ -196,17 +198,19 @@ public EventLoop getEventLoop() {
} }


// Execute an internal task on the internal blocking ordered executor // Execute an internal task on the internal blocking ordered executor
public <T> void executeBlocking(Action<T> action, Handler<AsyncResult<T>> resultHandler) { public <T> void executeBlocking(Action<T> action, boolean internal, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(action, null, resultHandler); executeBlocking(action, null, internal, resultHandler);
} }


public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) { public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(null, blockingCodeHandler, resultHandler); executeBlocking(null, blockingCodeHandler, false, resultHandler);
} }


public <T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) { private <T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCodeHandler, boolean internal,
Handler<AsyncResult<T>> resultHandler) {
try { try {
orderedInternalPoolExec.execute(() -> { Executor exec = internal ? orderedInternalPoolExec : workerExec;
exec.execute(() -> {
Future<T> res = Future.future(); Future<T> res = Future.future();
try { try {
if (blockingCodeHandler != null) { if (blockingCodeHandler != null) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/impl/DeploymentManager.java
Expand Up @@ -592,7 +592,7 @@ private void startRedeployTimer() {


private void doStartRedeployTimer() { private void doStartRedeployTimer() {
redeployTimerID = vertx.setTimer(options.getRedeployScanPeriod(), tid -> { redeployTimerID = vertx.setTimer(options.getRedeployScanPeriod(), tid -> {
vertx.executeBlocking(redeployer, res -> { vertx.executeBlockingInternal(redeployer, res -> {
if (res.succeeded()) { if (res.succeeded()) {
if (res.result()) { if (res.result()) {
doRedeploy(); doRedeploy();
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/impl/EventLoopContext.java
Expand Up @@ -32,9 +32,9 @@ public class EventLoopContext extends ContextImpl {


private static final Logger log = LoggerFactory.getLogger(EventLoopContext.class); private static final Logger log = LoggerFactory.getLogger(EventLoopContext.class);


public EventLoopContext(VertxInternal vertx, Executor bgExec, String deploymentID, JsonObject config, public EventLoopContext(VertxInternal vertx, Executor internalBlockingExec, Executor workerExec, String deploymentID, JsonObject config,
ClassLoader tccl) { ClassLoader tccl) {
super(vertx, bgExec, deploymentID, config, tccl); super(vertx, internalBlockingExec, workerExec, deploymentID, config, tccl);
} }


public void executeAsync(Handler<Void> task) { public void executeAsync(Handler<Void> task) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/impl/HAManager.java
Expand Up @@ -283,7 +283,7 @@ private synchronized void checkQuorumWhenAdded(final String nodeID, final long s
} else { } else {
vertx.setTimer(200, tid -> { vertx.setTimer(200, tid -> {
// This can block on a monitor so it needs to run as a worker // This can block on a monitor so it needs to run as a worker
vertx.executeBlocking(() -> { vertx.executeBlockingInternal(() -> {
if (System.currentTimeMillis() - start > 10000) { if (System.currentTimeMillis() - start > 10000) {
log.warn("Timed out waiting for group information to appear"); log.warn("Timed out waiting for group information to appear");
} else if (!stopped) { } else if (!stopped) {
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/io/vertx/core/impl/VertxImpl.java
Expand Up @@ -345,7 +345,7 @@ public boolean cancelTimer(long id) {
} }


public EventLoopContext createEventLoopContext(String deploymentID, JsonObject config, ClassLoader tccl) { public EventLoopContext createEventLoopContext(String deploymentID, JsonObject config, ClassLoader tccl) {
return new EventLoopContext(this, workerOrderedFact.getExecutor(), deploymentID, config, tccl); return new EventLoopContext(this, internalOrderedFact.getExecutor(), workerOrderedFact.getExecutor(), deploymentID, config, tccl);
} }


@Override @Override
Expand Down Expand Up @@ -566,10 +566,16 @@ public Set<VerticleFactory> verticleFactories() {
return deploymentManager.verticleFactories(); return deploymentManager.verticleFactories();
} }


@Override
public <T> void executeBlockingInternal(Action<T> action, Handler<AsyncResult<T>> resultHandler) {
ContextImpl context = getOrCreateContext();
context.executeBlocking(action, true, resultHandler);
}

@Override @Override
public <T> void executeBlocking(Action<T> action, Handler<AsyncResult<T>> resultHandler) { public <T> void executeBlocking(Action<T> action, Handler<AsyncResult<T>> resultHandler) {
ContextImpl context = getOrCreateContext(); ContextImpl context = getOrCreateContext();
context.executeBlocking(action, resultHandler); context.executeBlocking(action, false, resultHandler);
} }


@Override @Override
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/vertx/core/impl/VertxInternal.java
Expand Up @@ -18,9 +18,11 @@




import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.http.impl.HttpServerImpl; import io.vertx.core.http.impl.HttpServerImpl;
import io.vertx.core.json.JsonObject; import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.cluster.Action;
import io.vertx.core.spi.metrics.VertxMetrics; import io.vertx.core.spi.metrics.VertxMetrics;
import io.vertx.core.net.impl.NetServerImpl; import io.vertx.core.net.impl.NetServerImpl;
import io.vertx.core.net.impl.ServerID; import io.vertx.core.net.impl.ServerID;
Expand Down Expand Up @@ -83,4 +85,6 @@ public interface VertxInternal extends VertxSPI {
String getNodeID(); String getNodeID();


File resolveFile(String fileName); File resolveFile(String fileName);

<T> void executeBlockingInternal(Action<T> action, Handler<AsyncResult<T>> resultHandler);
} }
5 changes: 1 addition & 4 deletions src/main/java/io/vertx/core/impl/WorkerContext.java
Expand Up @@ -28,12 +28,9 @@
*/ */
public class WorkerContext extends ContextImpl { public class WorkerContext extends ContextImpl {


protected final Executor workerExec;

public WorkerContext(VertxInternal vertx, Executor orderedInternalPoolExec, Executor workerExec, String deploymentID, public WorkerContext(VertxInternal vertx, Executor orderedInternalPoolExec, Executor workerExec, String deploymentID,
JsonObject config, ClassLoader tccl) { JsonObject config, ClassLoader tccl) {
super(vertx, orderedInternalPoolExec, deploymentID, config, tccl); super(vertx, orderedInternalPoolExec, workerExec, deploymentID, config, tccl);
this.workerExec = workerExec;
} }


@Override @Override
Expand Down

0 comments on commit bd8c1fd

Please sign in to comment.