Skip to content

Commit

Permalink
Minor improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Apr 29, 2016
1 parent f24aa63 commit e69a27b
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 54 deletions.
16 changes: 9 additions & 7 deletions src/main/java/io/vertx/core/impl/ContextImpl.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public abstract class ContextImpl implements ContextInternal {
private static final boolean DISABLE_TIMINGS = Boolean.getBoolean(DISABLE_TIMINGS_PROP_NAME); private static final boolean DISABLE_TIMINGS = Boolean.getBoolean(DISABLE_TIMINGS_PROP_NAME);
private static final boolean DISABLE_TCCL = Boolean.getBoolean(DISABLE_TCCL_PROP_NAME); private static final boolean DISABLE_TCCL = Boolean.getBoolean(DISABLE_TCCL_PROP_NAME);


protected final VertxImpl owner; protected final VertxInternal owner;
protected final String deploymentID; protected final String deploymentID;
protected final JsonObject config; protected final JsonObject config;
private Deployment deployment; private Deployment deployment;
Expand All @@ -58,10 +58,11 @@ public abstract class ContextImpl implements ContextInternal {
private Map<String, Object> contextData; private Map<String, Object> contextData;
private volatile Handler<Throwable> exceptionHandler; private volatile Handler<Throwable> exceptionHandler;
protected final WorkerPool workerPool; protected final WorkerPool workerPool;
protected final WorkerPool internalBlockingPool;
protected final Executor orderedInternalPoolExec; protected final Executor orderedInternalPoolExec;
protected final Executor workerExec; protected final Executor workerExec;


protected ContextImpl(VertxInternal vertx, WorkerPool workerPool, String deploymentID, JsonObject config, protected ContextImpl(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config,
ClassLoader tccl) { ClassLoader tccl) {
if (DISABLE_TCCL && !tccl.getClass().getName().equals("sun.misc.Launcher$AppClassLoader")) { if (DISABLE_TCCL && !tccl.getClass().getName().equals("sun.misc.Launcher$AppClassLoader")) {
log.warn("You have disabled TCCL checks but you have a custom TCCL to set."); log.warn("You have disabled TCCL checks but you have a custom TCCL to set.");
Expand All @@ -75,11 +76,12 @@ protected ContextImpl(VertxInternal vertx, WorkerPool workerPool, String deploym
this.eventLoop = null; this.eventLoop = null;
} }
this.tccl = tccl; this.tccl = tccl;
this.owner = (VertxImpl) vertx; this.owner = vertx;
this.exceptionHandler = vertx.exceptionHandler(); this.exceptionHandler = vertx.exceptionHandler();
this.workerPool = workerPool; this.workerPool = workerPool;
this.orderedInternalPoolExec = owner.internalOrderedFact.getExecutor(); this.internalBlockingPool = internalBlockingPool;
this.workerExec = workerPool.workerOrderedFact.getExecutor(); this.orderedInternalPoolExec = internalBlockingPool.createOrderedExecutor();
this.workerExec = workerPool.createOrderedExecutor();
} }


public static void setContext(ContextImpl context) { public static void setContext(ContextImpl context) {
Expand Down Expand Up @@ -267,12 +269,12 @@ public Vertx owner() {


// Execute an internal task on the internal blocking ordered executor // Execute an internal task on the internal blocking ordered executor
public <T> void executeBlocking(Action<T> action, Handler<AsyncResult<T>> resultHandler) { public <T> void executeBlocking(Action<T> action, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(action, null, resultHandler, orderedInternalPoolExec, owner.internalBlockingPoolMetrics); executeBlocking(action, null, resultHandler, orderedInternalPoolExec, internalBlockingPool.metrics());
} }


@Override @Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler) { public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(null, blockingCodeHandler, resultHandler, ordered ? workerExec : workerPool.workerPool, workerPool.workerMetrics); executeBlocking(null, blockingCodeHandler, resultHandler, ordered ? workerExec : workerPool.executor(), workerPool.metrics());
} }


@Override @Override
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/impl/EventLoopContext.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ public class EventLoopContext extends ContextImpl {


private static final Logger log = LoggerFactory.getLogger(EventLoopContext.class); private static final Logger log = LoggerFactory.getLogger(EventLoopContext.class);


public EventLoopContext(VertxInternal vertx, WorkerPool workerPool, String deploymentID, JsonObject config, public EventLoopContext(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config,
ClassLoader tccl) { ClassLoader tccl) {
super(vertx, workerPool, deploymentID, config, tccl); super(vertx, internalBlockingPool, workerPool, deploymentID, config, tccl);
} }


public void executeAsync(Handler<Void> task) { public void executeAsync(Handler<Void> task) {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
*/ */
public class MultiThreadedWorkerContext extends WorkerContext { public class MultiThreadedWorkerContext extends WorkerContext {


public MultiThreadedWorkerContext(VertxInternal vertx, WorkerPool workerPool, public MultiThreadedWorkerContext(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool,
String deploymentID, JsonObject config, ClassLoader tccl) { String deploymentID, JsonObject config, ClassLoader tccl) {
super(vertx, workerPool, deploymentID, config, tccl); super(vertx, internalBlockingPool, workerPool, deploymentID, config, tccl);
} }


@Override @Override
public void executeAsync(Handler<Void> task) { public void executeAsync(Handler<Void> task) {
workerPool.workerPool.execute(wrapTask(null, task, false, workerPool.workerMetrics)); workerPool.executor().execute(wrapTask(null, task, false, workerPool.metrics()));
} }


@Override @Override
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/impl/NamedWorkerExecutor.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class NamedWorkerExecutor implements WorkerExecutor, Closeable {
public NamedWorkerExecutor(ContextImpl context, VertxImpl.NamedWorkerPool pool) { public NamedWorkerExecutor(ContextImpl context, VertxImpl.NamedWorkerPool pool) {
this.pool = pool; this.pool = pool;
this.context = context; this.context = context;
this.workerExec = pool.workerOrderedFact.getExecutor(); this.workerExec = pool.createOrderedExecutor();
} }


public WorkerPool getPool() { public WorkerPool getPool() {
Expand All @@ -48,7 +48,7 @@ public synchronized <T> void executeBlocking(Handler<Future<T>> blockingCodeHand
if (closed) { if (closed) {
throw new IllegalStateException("Worker executor closed"); throw new IllegalStateException("Worker executor closed");
} }
context.executeBlocking(null, blockingCodeHandler, asyncResultHandler, ordered ? workerExec : pool.workerPool, pool.workerMetrics); context.executeBlocking(null, blockingCodeHandler, asyncResultHandler, ordered ? workerExec : pool.executor(), pool.metrics());
} }


public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> asyncResultHandler) { public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> asyncResultHandler) {
Expand Down
46 changes: 17 additions & 29 deletions src/main/java/io/vertx/core/impl/VertxImpl.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -95,12 +95,8 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
private final FileResolver fileResolver; private final FileResolver fileResolver;
private final Map<ServerID, HttpServerImpl> sharedHttpServers = new HashMap<>(); private final Map<ServerID, HttpServerImpl> sharedHttpServers = new HashMap<>();
private final Map<ServerID, NetServerImpl> sharedNetServers = new HashMap<>(); private final Map<ServerID, NetServerImpl> sharedNetServers = new HashMap<>();
private final ExecutorService workerPool; private final WorkerPool workerPool;
private final ThreadPoolMetrics workerPoolMetrics; private final WorkerPool internalBlockingPool;
private final ExecutorService internalBlockingPool;
final ThreadPoolMetrics internalBlockingPoolMetrics;
private final OrderedExecutorFactory workerOrderedFact;
final OrderedExecutorFactory internalOrderedFact;
private final ThreadFactory eventLoopThreadFactory; private final ThreadFactory eventLoopThreadFactory;
private final NioEventLoopGroup eventLoopGroup; private final NioEventLoopGroup eventLoopGroup;
private final NioEventLoopGroup acceptorEventLoopGroup; private final NioEventLoopGroup acceptorEventLoopGroup;
Expand All @@ -112,7 +108,6 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
private boolean closed; private boolean closed;
private Handler<Throwable> exceptionHandler; private Handler<Throwable> exceptionHandler;
private final Map<String, NamedWorkerPool> namedWorkerPools; private final Map<String, NamedWorkerPool> namedWorkerPools;
private final WorkerPool vertxWorkerPool;
private final int defaultWorkerPoolSize; private final int defaultWorkerPoolSize;
private final long defaultWorkerMaxExecTime; private final long defaultWorkerMaxExecTime;


Expand Down Expand Up @@ -141,16 +136,15 @@ public class VertxImpl implements VertxInternal, MetricsProvider {


metrics = initialiseMetrics(options); metrics = initialiseMetrics(options);


workerPoolMetrics = isMetricsEnabled() ? metrics.createMetrics("vert.x-worker-thread", options.getWorkerPoolSize()) : null; ThreadPoolMetrics workerPoolMetrics = isMetricsEnabled() ? metrics.createMetrics("vert.x-worker-thread", options.getWorkerPoolSize()) : null;
workerPool = Executors.newFixedThreadPool(options.getWorkerPoolSize(), ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(),
new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime())); new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime()));
internalBlockingPoolMetrics = isMetricsEnabled() ? metrics.createMetrics("vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null; ThreadPoolMetrics internalBlockingPoolMetrics = isMetricsEnabled() ? metrics.createMetrics("vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null;
internalBlockingPool = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(), ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),
new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime())); new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime()));
internalOrderedFact = new OrderedExecutorFactory(internalBlockingPool); internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics);
namedWorkerPools = new HashMap<>(); namedWorkerPools = new HashMap<>();
vertxWorkerPool = new WorkerPool(workerPool, workerPoolMetrics); workerPool = new WorkerPool(workerExec, workerPoolMetrics);
workerOrderedFact = vertxWorkerPool.workerOrderedFact;
defaultWorkerPoolSize = options.getWorkerPoolSize(); defaultWorkerPoolSize = options.getWorkerPoolSize();
defaultWorkerMaxExecTime = options.getMaxWorkerExecuteTime(); defaultWorkerMaxExecTime = options.getMaxWorkerExecuteTime();


Expand Down Expand Up @@ -298,7 +292,7 @@ public void runOnContext(Handler<Void> task) {


// The background pool is used for making blocking calls to legacy synchronous APIs // The background pool is used for making blocking calls to legacy synchronous APIs
public ExecutorService getWorkerPool() { public ExecutorService getWorkerPool() {
return workerPool; return workerPool.executor();
} }


public EventLoopGroup getEventLoopGroup() { public EventLoopGroup getEventLoopGroup() {
Expand Down Expand Up @@ -347,18 +341,18 @@ public boolean cancelTimer(long id) {
} }


public EventLoopContext createEventLoopContext(String deploymentID, WorkerPool workerPool, JsonObject config, ClassLoader tccl) { public EventLoopContext createEventLoopContext(String deploymentID, WorkerPool workerPool, JsonObject config, ClassLoader tccl) {
return new EventLoopContext(this, workerPool != null ? workerPool : vertxWorkerPool, deploymentID, config, tccl); return new EventLoopContext(this, internalBlockingPool, workerPool != null ? workerPool : this.workerPool, deploymentID, config, tccl);
} }


public ContextImpl createWorkerContext(boolean multiThreaded, String deploymentID, WorkerPool workerPool, JsonObject config, public ContextImpl createWorkerContext(boolean multiThreaded, String deploymentID, WorkerPool workerPool, JsonObject config,
ClassLoader tccl) { ClassLoader tccl) {
if (workerPool == null) { if (workerPool == null) {
workerPool = vertxWorkerPool; workerPool = this.workerPool;
} }
if (multiThreaded) { if (multiThreaded) {
return new MultiThreadedWorkerContext(this, workerPool, deploymentID, config, tccl); return new MultiThreadedWorkerContext(this, internalBlockingPool, workerPool, deploymentID, config, tccl);
} else { } else {
return new WorkerContext(this, workerPool, deploymentID, config, tccl); return new WorkerContext(this, internalBlockingPool, workerPool, deploymentID, config, tccl);
} }
} }


Expand Down Expand Up @@ -689,14 +683,8 @@ public void resolveHostname(String hostname, Handler<AsyncResult<InetAddress>> r
private void deleteCacheDirAndShutdown(Handler<AsyncResult<Void>> completionHandler) { private void deleteCacheDirAndShutdown(Handler<AsyncResult<Void>> completionHandler) {
fileResolver.close(res -> { fileResolver.close(res -> {


if (workerPoolMetrics != null) { workerPool.close();
workerPoolMetrics.close(); internalBlockingPool.close();
}
workerPool.shutdownNow();
if (internalBlockingPoolMetrics != null) {
internalBlockingPoolMetrics.close();
}
internalBlockingPool.shutdownNow();


acceptorEventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).addListener(new GenericFutureListener() { acceptorEventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).addListener(new GenericFutureListener() {
@Override @Override
Expand Down Expand Up @@ -894,8 +882,8 @@ void release() {
synchronized (VertxImpl.this) { synchronized (VertxImpl.this) {
if (--refCount == 0) { if (--refCount == 0) {
releaseWorkerPool(name); releaseWorkerPool(name);
if (workerPoolMetrics != null) { if (metrics != null) {
workerPoolMetrics.close(); metrics.close();
} }
workerExec.shutdownNow(); workerExec.shutdownNow();
} }
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/vertx/core/impl/WorkerContext.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
*/ */
public class WorkerContext extends ContextImpl { public class WorkerContext extends ContextImpl {


public WorkerContext(VertxInternal vertx, WorkerPool workerPool, String deploymentID, public WorkerContext(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID,
JsonObject config, ClassLoader tccl) { JsonObject config, ClassLoader tccl) {
super(vertx, workerPool, deploymentID, config, tccl); super(vertx, internalBlockingPool, workerPool, deploymentID, config, tccl);
} }


@Override @Override
public void executeAsync(Handler<Void> task) { public void executeAsync(Handler<Void> task) {
workerExec.execute(wrapTask(null, task, true, workerPool.workerMetrics)); workerExec.execute(wrapTask(null, task, true, workerPool.metrics()));
} }


@Override @Override
Expand All @@ -53,7 +53,7 @@ protected void checkCorrectThread() {
// so we need to execute it on the worker thread // so we need to execute it on the worker thread
@Override @Override
public void executeFromIO(ContextTask task) { public void executeFromIO(ContextTask task) {
workerExec.execute(wrapTask(task, null, true, workerPool.workerMetrics)); workerExec.execute(wrapTask(task, null, true, workerPool.metrics()));
} }


} }
34 changes: 27 additions & 7 deletions src/main/java/io/vertx/core/impl/WorkerPool.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,19 +19,39 @@
import io.vertx.core.spi.metrics.ThreadPoolMetrics; import io.vertx.core.spi.metrics.ThreadPoolMetrics;


import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;


/** /**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a> * @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/ */
class WorkerPool { class WorkerPool {


protected final OrderedExecutorFactory workerOrderedFact; private final OrderedExecutorFactory orderedFact;
protected final Executor workerPool; private final ExecutorService pool;
protected final ThreadPoolMetrics workerMetrics; private final ThreadPoolMetrics metrics;


public WorkerPool(Executor workerPool, ThreadPoolMetrics workerMetrics) { public WorkerPool(ExecutorService pool, ThreadPoolMetrics metrics) {
this.workerOrderedFact = new OrderedExecutorFactory(workerPool); this.orderedFact = new OrderedExecutorFactory(pool);
this.workerPool = workerPool; this.pool = pool;
this.workerMetrics = workerMetrics; this.metrics = metrics;
}

ExecutorService executor() {
return pool;
}

Executor createOrderedExecutor() {
return orderedFact.getExecutor();
}

ThreadPoolMetrics metrics() {
return metrics;
}

void close() {
if (metrics != null) {
metrics.close();
}
pool.shutdownNow();
} }
} }

0 comments on commit e69a27b

Please sign in to comment.