Skip to content

Commit

Permalink
Named worker pool
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Apr 28, 2016
1 parent f90ecff commit 4d8b0a5
Show file tree
Hide file tree
Showing 22 changed files with 806 additions and 78 deletions.
13 changes: 13 additions & 0 deletions src/main/asciidoc/dataobjects.adoc
Expand Up @@ -268,6 +268,10 @@ Set the isolated class names.
+++ +++
Set the isolation group that will be used when deploying the verticle(s) Set the isolation group that will be used when deploying the verticle(s)
+++ +++
|[[maxWorkerExecuteTime]]`maxWorkerExecuteTime`|`Number (long)`|
+++
Sets the value of max worker execute time, in ns.
+++
|[[multiThreaded]]`multiThreaded`|`Boolean`| |[[multiThreaded]]`multiThreaded`|`Boolean`|
+++ +++
Set whether the verticle(s) should be deployed as a multi-threaded worker verticle Set whether the verticle(s) should be deployed as a multi-threaded worker verticle
Expand All @@ -276,6 +280,15 @@ Set whether the verticle(s) should be deployed as a multi-threaded worker vertic
+++ +++
Set whether the verticle(s) should be deployed as a worker verticle Set whether the verticle(s) should be deployed as a worker verticle
+++ +++
|[[workerPoolName]]`workerPoolName`|`String`|
+++
Set the worker pool name to use for this verticle. When no name is set, the Vert.x
worker pool will be used, when a name is set, the verticle will use a named worker pool.
+++
|[[workerPoolSize]]`workerPoolSize`|`Number (int)`|
+++
Set the maximum number of worker threads to be used by the Vert.x instance.
+++
|=== |===


[[EventBusOptions]] [[EventBusOptions]]
Expand Down
55 changes: 55 additions & 0 deletions src/main/asciidoc/java/index.adoc
Expand Up @@ -291,6 +291,49 @@ An alternative way to run blocking code is to use a <<worker_verticles, worker v


A worker verticle is always executed with a thread from the worker pool. A worker verticle is always executed with a thread from the worker pool.


By default blocking code is executed on the Vert.x blocking code pool, configured with `link:../../apidocs/io/vertx/core/VertxOptions.html#setWorkerPoolSize-int-[setWorkerPoolSize]`.

Additional pools can be created for different purposes:

[source,java]
----
WorkerExecutor executor = vertx.createWorkerExecutor("my-worker-pool");
executor.executeBlocking(future -> {
// Call some blocking API that takes a significant amount of time to return
String result = someAPI.blockingMethod("hello");
future.complete(result);
}, res -> {
System.out.println("The result is: " + res.result());
});
----

The worker executor must be closed when it's not necessary anymore:

[source,java]
----
executor.close();
----

When several workers are created with the same name, they will share the same pool. The worker pool is destroyed
when all the worker executors using it are closed.

When an executor is created in a Verticle, Vert.x will close it automatically for you when the Verticle
is undeployed.

Worker executors can be configured when created:

[source,java]
----
int poolSize = 10;
// 2 minutes
long maxExecuteTime = 120000;
WorkerExecutor executor = vertx.createWorkerExecutor("my-worker-pool", poolSize, maxExecuteTime);
----

NOTE: the configuration is set when the worker pool is created

== Async coordination == Async coordination


Coordination of multiple asynchronous results can be achieved with Vert.x `link:../../apidocs/io/vertx/core/Future.html[futures]`. Coordination of multiple asynchronous results can be achieved with Vert.x `link:../../apidocs/io/vertx/core/Future.html[futures]`.
Expand Down Expand Up @@ -773,6 +816,18 @@ vertx.cancelTimer(timerID);
If you're creating timers from inside verticles, those timers will be automatically closed If you're creating timers from inside verticles, those timers will be automatically closed
when the verticle is undeployed. when the verticle is undeployed.


=== Verticle worker pool

Verticle use the Vert.x worker pool for executing blocking actions, i.e `link:../../apidocs/io/vertx/core/Context.html#executeBlocking-io.vertx.core.Handler-boolean-io.vertx.core.Handler-[executeBlocking]` or
worker verticle.

A different worker pool can be specified in deployment options:

[source,java]
----
vertx.deployVerticle("the-verticle", new DeploymentOptions().setWorkerPoolName("the-specific-pool"));
----

[[event_bus]] [[event_bus]]
include::eventbus.adoc[] include::eventbus.adoc[]


Expand Down
14 changes: 14 additions & 0 deletions src/main/generated/io/vertx/core/DeploymentOptionsConverter.java
Expand Up @@ -55,12 +55,21 @@ public static void fromJson(JsonObject json, DeploymentOptions obj) {
if (json.getValue("isolationGroup") instanceof String) { if (json.getValue("isolationGroup") instanceof String) {
obj.setIsolationGroup((String)json.getValue("isolationGroup")); obj.setIsolationGroup((String)json.getValue("isolationGroup"));
} }
if (json.getValue("maxWorkerExecuteTime") instanceof Number) {
obj.setMaxWorkerExecuteTime(((Number)json.getValue("maxWorkerExecuteTime")).longValue());
}
if (json.getValue("multiThreaded") instanceof Boolean) { if (json.getValue("multiThreaded") instanceof Boolean) {
obj.setMultiThreaded((Boolean)json.getValue("multiThreaded")); obj.setMultiThreaded((Boolean)json.getValue("multiThreaded"));
} }
if (json.getValue("worker") instanceof Boolean) { if (json.getValue("worker") instanceof Boolean) {
obj.setWorker((Boolean)json.getValue("worker")); obj.setWorker((Boolean)json.getValue("worker"));
} }
if (json.getValue("workerPoolName") instanceof String) {
obj.setWorkerPoolName((String)json.getValue("workerPoolName"));
}
if (json.getValue("workerPoolSize") instanceof Number) {
obj.setWorkerPoolSize(((Number)json.getValue("workerPoolSize")).intValue());
}
} }


public static void toJson(DeploymentOptions obj, JsonObject json) { public static void toJson(DeploymentOptions obj, JsonObject json) {
Expand All @@ -86,7 +95,12 @@ public static void toJson(DeploymentOptions obj, JsonObject json) {
if (obj.getIsolationGroup() != null) { if (obj.getIsolationGroup() != null) {
json.put("isolationGroup", obj.getIsolationGroup()); json.put("isolationGroup", obj.getIsolationGroup());
} }
json.put("maxWorkerExecuteTime", obj.getMaxWorkerExecuteTime());
json.put("multiThreaded", obj.isMultiThreaded()); json.put("multiThreaded", obj.isMultiThreaded());
json.put("worker", obj.isWorker()); json.put("worker", obj.isWorker());
if (obj.getWorkerPoolName() != null) {
json.put("workerPoolName", obj.getWorkerPoolName());
}
json.put("workerPoolSize", obj.getWorkerPoolSize());
} }
} }
30 changes: 30 additions & 0 deletions src/main/java/examples/CoreExamples.java
Expand Up @@ -79,6 +79,32 @@ public void example7(Vertx vertx) {
}); });
} }


public void workerExecutor1(Vertx vertx) {
WorkerExecutor executor = vertx.createWorkerExecutor("my-worker-pool");
executor.executeBlocking(future -> {
// Call some blocking API that takes a significant amount of time to return
String result = someAPI.blockingMethod("hello");
future.complete(result);
}, res -> {
System.out.println("The result is: " + res.result());
});
}

public void workerExecutor2(WorkerExecutor executor) {
executor.close();
}

public void workerExecutor3(Vertx vertx) {
//
// 10 threads max
int poolSize = 10;

// 2 minutes
long maxExecuteTime = 120000;

WorkerExecutor executor = vertx.createWorkerExecutor("my-worker-pool", poolSize, maxExecuteTime);
}

BlockingAPI someAPI = new BlockingAPI(); BlockingAPI someAPI = new BlockingAPI();


class BlockingAPI { class BlockingAPI {
Expand Down Expand Up @@ -284,4 +310,8 @@ public void configureDNSServers() {
); );
} }


public void deployVerticleWithDifferentWorkerPool(Vertx vertx) {
vertx.deployVerticle("the-verticle", new DeploymentOptions().setWorkerPoolName("the-specific-pool"));
}

} }
80 changes: 80 additions & 0 deletions src/main/java/io/vertx/core/DeploymentOptions.java
Expand Up @@ -43,6 +43,9 @@ public class DeploymentOptions {
private boolean worker; private boolean worker;
private boolean multiThreaded; private boolean multiThreaded;
private String isolationGroup; private String isolationGroup;
private String workerPoolName;
private int workerPoolSize;
private long maxWorkerExecuteTime;
private boolean ha; private boolean ha;
private List<String> extraClasspath; private List<String> extraClasspath;
private int instances; private int instances;
Expand All @@ -58,6 +61,9 @@ public DeploymentOptions() {
this.isolationGroup = DEFAULT_ISOLATION_GROUP; this.isolationGroup = DEFAULT_ISOLATION_GROUP;
this.ha = DEFAULT_HA; this.ha = DEFAULT_HA;
this.instances = DEFAULT_INSTANCES; this.instances = DEFAULT_INSTANCES;
this.workerPoolName = null;
this.workerPoolSize = VertxOptions.DEFAULT_WORKER_POOL_SIZE;
this.maxWorkerExecuteTime = VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME;
} }


/** /**
Expand Down Expand Up @@ -274,6 +280,80 @@ public DeploymentOptions setIsolatedClasses(List<String> isolatedClasses) {
return this; return this;
} }


/**
* @return the worker pool name
*/
public String getWorkerPoolName() {
return workerPoolName;
}

/**
* Set the worker pool name to use for this verticle. When no name is set, the Vert.x
* worker pool will be used, when a name is set, the verticle will use a named worker pool.
*
* @param workerPoolName the worker pool name
* @return a reference to this, so the API can be used fluently
*/
public DeploymentOptions setWorkerPoolName(String workerPoolName) {
this.workerPoolName = workerPoolName;
return this;
}

/**
* Get the maximum number of worker threads to be used by the worker pool when the verticle is deployed
* with a {@link #setWorkerPoolName}. When the verticle does not use a named worker pool, this option
* has no effect.
* <p>
* Worker threads are used for running blocking code and worker verticles.
*
* @return the maximum number of worker threads
*/
public int getWorkerPoolSize() {
return workerPoolSize;
}

/**
* Set the maximum number of worker threads to be used by the Vert.x instance.
*
* @param workerPoolSize the number of threads
* @return a reference to this, so the API can be used fluently
*/
public DeploymentOptions setWorkerPoolSize(int workerPoolSize) {
if (workerPoolSize < 1) {
throw new IllegalArgumentException("workerPoolSize must be > 0");
}
this.workerPoolSize = workerPoolSize;
return this;
}

/**
* Get the value of max worker execute time, in ns.
* <p>
* Vert.x will automatically log a warning if it detects that worker threads haven't returned within this time.
* <p>
* This can be used to detect where the user is blocking a worker thread for too long. Although worker threads
* can be blocked longer than event loop threads, they shouldn't be blocked for long periods of time.
*
* @return The value of max worker execute time, in ms.
*/
public long getMaxWorkerExecuteTime() {
return maxWorkerExecuteTime;
}

/**
* Sets the value of max worker execute time, in ns.
*
* @param maxWorkerExecuteTime the value of max worker execute time, in ms.
* @return a reference to this, so the API can be used fluently
*/
public DeploymentOptions setMaxWorkerExecuteTime(long maxWorkerExecuteTime) {
if (maxWorkerExecuteTime < 1) {
throw new IllegalArgumentException("maxWorkerpExecuteTime must be > 0");
}
this.maxWorkerExecuteTime = maxWorkerExecuteTime;
return this;
}

/** /**
* Convert this to JSON * Convert this to JSON
* *
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/io/vertx/core/Vertx.java
Expand Up @@ -480,6 +480,33 @@ static void clusteredVertx(VertxOptions options, Handler<AsyncResult<Vertx>> res
@GenIgnore @GenIgnore
EventLoopGroup nettyEventLoopGroup(); EventLoopGroup nettyEventLoopGroup();


/**
* Like {@link #createWorkerExecutor(String, int)} but with the {@link VertxOptions#setWorkerPoolSize} {@code poolSize}.
*/
WorkerExecutor createWorkerExecutor(String name);

/**
* Like {@link #createWorkerExecutor(String, int, long)} but with the {@link VertxOptions#setMaxWorkerExecuteTime} {@code maxExecuteTime}.
*/
WorkerExecutor createWorkerExecutor(String name, int poolSize);

/**
* Create a named worker executor, the executor should be closed when it's not needed anymore to release
* resources.<p/>
*
* This method can be called mutiple times with the same {@code name}. Executors with the same name will share
* the same worker pool. The worker pool size and max execute time are set when the worker pool is created and
* won't change after.<p>
*
* The worker pool is released when all the {@link WorkerExecutor} sharing the same name are closed.
*
* @param name the name of the worker executor
* @param poolSize the size of the pool
* @param maxExecuteTime the value of max worker execute time, in ms
* @return the named worker executor
*/
WorkerExecutor createWorkerExecutor(String name, int poolSize, long maxExecuteTime);

/** /**
* Set a default exception handler for {@link Context}, set on {@link Context#exceptionHandler(Handler)} at creation. * Set a default exception handler for {@link Context}, set on {@link Context#exceptionHandler(Handler)} at creation.
* *
Expand Down
61 changes: 61 additions & 0 deletions src/main/java/io/vertx/core/WorkerExecutor.java
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2011-2013 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/

package io.vertx.core;

import io.vertx.codegen.annotations.VertxGen;

/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
@VertxGen
public interface WorkerExecutor {

/**
* Safely execute some blocking code.
* <p>
* Executes the blocking code in the handler {@code blockingCodeHandler} using a thread from the worker pool.
* <p>
* When the code is complete the handler {@code resultHandler} will be called with the result on the original context
* (e.g. on the original event loop of the caller).
* <p>
* A {@code Future} instance is passed into {@code blockingCodeHandler}. When the blocking code successfully completes,
* the handler should call the {@link Future#complete} or {@link Future#complete(Object)} method, or the {@link Future#fail}
* method if it failed.
* <p>
* In the {@code blockingCodeHandler} the current context remains the original context and therefore any task
* scheduled in the {@code blockingCodeHandler} will be executed on the this context and not on the worker thread.
*
* @param blockingCodeHandler handler representing the blocking code to run
* @param resultHandler handler that will be called when the blocking code is complete
* @param ordered if true then if executeBlocking is called several times on the same context, the executions
* for that context will be executed serially, not in parallel. if false then they will be no ordering
* guarantees
* @param <T> the type of the result
*/
<T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler);

/**
* Like {@link #executeBlocking(Handler, boolean, Handler)} called with ordered = true.
*/
<T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler);

/**
* Close the executor.
*/
void close();

}
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/impl/BlockedThreadChecker.java
Expand Up @@ -36,7 +36,7 @@ public class BlockedThreadChecker {
private final Map<VertxThread, Object> threads = new WeakHashMap<>(); private final Map<VertxThread, Object> threads = new WeakHashMap<>();
private final Timer timer; // Need to use our own timer - can't use event loop for this private final Timer timer; // Need to use our own timer - can't use event loop for this


BlockedThreadChecker(long interval, long maxEventLoopExecTime, long maxWorkerExecTime, long warningExceptionTime) { BlockedThreadChecker(long interval, long warningExceptionTime) {
timer = new Timer("vertx-blocked-thread-checker", true); timer = new Timer("vertx-blocked-thread-checker", true);
timer.schedule(new TimerTask() { timer.schedule(new TimerTask() {
@Override @Override
Expand All @@ -46,7 +46,7 @@ public void run() {
for (VertxThread thread : threads.keySet()) { for (VertxThread thread : threads.keySet()) {
long execStart = thread.startTime(); long execStart = thread.startTime();
long dur = now - execStart; long dur = now - execStart;
final long timeLimit = thread.isWorker() ? maxWorkerExecTime : maxEventLoopExecTime; final long timeLimit = thread.getMaxExecTime();
if (execStart != 0 && dur > timeLimit) { if (execStart != 0 && dur > timeLimit) {
final String message = "Thread " + thread + " has been blocked for " + (dur / 1000000) + " ms, time limit is " + (timeLimit / 1000000); final String message = "Thread " + thread + " has been blocked for " + (dur / 1000000) + " ms, time limit is " + (timeLimit / 1000000);
if (dur <= warningExceptionTime) { if (dur <= warningExceptionTime) {
Expand Down

0 comments on commit 4d8b0a5

Please sign in to comment.