Skip to content

Commit

Permalink
A few fixes + improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Apr 28, 2016
1 parent 14e5eba commit f31dd91
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 66 deletions.
49 changes: 45 additions & 4 deletions src/main/java/io/vertx/core/impl/ContextImpl.java
Expand Up @@ -27,6 +27,7 @@

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -45,7 +46,7 @@ public abstract class ContextImpl implements ContextInternal {
private static final boolean DISABLE_TIMINGS = Boolean.getBoolean(DISABLE_TIMINGS_PROP_NAME);
private static final boolean DISABLE_TCCL = Boolean.getBoolean(DISABLE_TCCL_PROP_NAME);

protected final VertxInternal owner;
protected final VertxImpl owner;
protected final String deploymentID;
protected final JsonObject config;
private Deployment deployment;
Expand All @@ -57,6 +58,8 @@ public abstract class ContextImpl implements ContextInternal {
private Map<String, Object> contextData;
private volatile Handler<Throwable> exceptionHandler;
protected final WorkerPool workerPool;
protected final Executor orderedInternalPoolExec;
protected final Executor workerExec;

protected ContextImpl(VertxInternal vertx, WorkerPool workerPool, String deploymentID, JsonObject config,
ClassLoader tccl) {
Expand All @@ -72,9 +75,11 @@ protected ContextImpl(VertxInternal vertx, WorkerPool workerPool, String deploym
this.eventLoop = null;
}
this.tccl = tccl;
this.owner = vertx;
this.owner = (VertxImpl) vertx;
this.exceptionHandler = vertx.exceptionHandler();
this.workerPool = workerPool;
this.orderedInternalPoolExec = owner.internalOrderedFact.getExecutor();
this.workerExec = workerPool.workerOrderedFact.getExecutor();
}

public static void setContext(ContextImpl context) {
Expand Down Expand Up @@ -262,19 +267,55 @@ public Vertx owner() {

// Execute an internal task on the internal blocking ordered executor
public <T> void executeBlocking(Action<T> action, Handler<AsyncResult<T>> resultHandler) {
workerPool.executeBlocking(this, action, null, true, true, resultHandler);
executeBlocking(action, null, resultHandler, orderedInternalPoolExec, owner.internalBlockingPoolMetrics);
}

@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler) {
workerPool.executeBlocking(this, null, blockingCodeHandler, false, ordered, resultHandler);
executeBlocking(null, blockingCodeHandler, resultHandler, ordered ? workerExec : workerPool.workerPool, workerPool.workerMetrics);
}

@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(blockingCodeHandler, true, resultHandler);
}

<T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCodeHandler,
Handler<AsyncResult<T>> resultHandler,
Executor exec, ThreadPoolMetrics metrics) {
Object metric = metrics != null ? metrics.taskSubmitted() : null;
try {
exec.execute(() -> {
if (metrics != null) {
metrics.taskExecuting(metric);
}
Future<T> res = Future.future();
try {
if (blockingCodeHandler != null) {
ContextImpl.setContext(this);
blockingCodeHandler.handle(res);
} else {
T result = action.perform();
res.complete(result);
}
} catch (Throwable e) {
res.fail(e);
}
if (metrics != null) {
metrics.taskCompleted(metric, res.succeeded());
}
if (resultHandler != null) {
runOnContext(v -> res.setHandler(resultHandler));
}
});
} catch (RejectedExecutionException ignore) {
// Pool is already shut down
if (metrics != null) {
metrics.taskRejected(metric);
}
}
}

protected synchronized Map<String, Object> contextData() {
if (contextData == null) {
contextData = new ConcurrentHashMap<>();
Expand Down
Expand Up @@ -31,7 +31,7 @@ public MultiThreadedWorkerContext(VertxInternal vertx, WorkerPool workerPool,

@Override
public void executeAsync(Handler<Void> task) {
workerPool.workerExec.execute(wrapTask(null, task, false, workerPool.workerMetrics));
workerPool.workerPool.execute(wrapTask(null, task, false, workerPool.workerMetrics));
}

@Override
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/io/vertx/core/impl/NamedWorkerExecutor.java
Expand Up @@ -22,6 +22,8 @@
import io.vertx.core.Handler;
import io.vertx.core.WorkerExecutor;

import java.util.concurrent.Executor;

/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
Expand All @@ -30,10 +32,12 @@ class NamedWorkerExecutor implements WorkerExecutor, Closeable {
private final ContextImpl context;
final VertxImpl.NamedWorkerPool pool;
private boolean closed;
private final Executor workerExec;

public NamedWorkerExecutor(ContextImpl context, VertxImpl.NamedWorkerPool pool) {
this.pool = pool;
this.context = context;
this.workerExec = pool.workerOrderedFact.getExecutor();
}

public WorkerPool getPool() {
Expand All @@ -44,7 +48,7 @@ public synchronized <T> void executeBlocking(Handler<Future<T>> blockingCodeHand
if (closed) {
throw new IllegalStateException("Worker executor closed");
}
pool.executeBlocking(context, null, blockingCodeHandler, false, ordered, asyncResultHandler);
context.executeBlocking(null, blockingCodeHandler, asyncResultHandler, ordered ? workerExec : pool.workerPool, pool.workerMetrics);
}

public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> asyncResultHandler) {
Expand Down
13 changes: 6 additions & 7 deletions src/main/java/io/vertx/core/impl/VertxImpl.java
Expand Up @@ -98,9 +98,9 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
private final ExecutorService workerPool;
private final ThreadPoolMetrics workerPoolMetrics;
private final ExecutorService internalBlockingPool;
private final ThreadPoolMetrics internalBlockingPoolMetrics;
final ThreadPoolMetrics internalBlockingPoolMetrics;
private final OrderedExecutorFactory workerOrderedFact;
private final OrderedExecutorFactory internalOrderedFact;
final OrderedExecutorFactory internalOrderedFact;
private final ThreadFactory eventLoopThreadFactory;
private final NioEventLoopGroup eventLoopGroup;
private final NioEventLoopGroup acceptorEventLoopGroup;
Expand Down Expand Up @@ -147,10 +147,10 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
internalBlockingPoolMetrics = isMetricsEnabled() ? metrics.createMetrics("vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null;
internalBlockingPool = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),
new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime()));
workerOrderedFact = new OrderedExecutorFactory(workerPool);
internalOrderedFact = new OrderedExecutorFactory(internalBlockingPool);
namedWorkerPools = new HashMap<>();
vertxWorkerPool = new WorkerPool(internalOrderedFact.getExecutor(), workerOrderedFact.getExecutor(), workerPool, internalBlockingPoolMetrics, workerPoolMetrics);
vertxWorkerPool = new WorkerPool(workerPool, workerPoolMetrics);
workerOrderedFact = vertxWorkerPool.workerOrderedFact;
defaultWorkerPoolSize = options.getWorkerPoolSize();
defaultWorkerMaxExecTime = options.getMaxWorkerExecuteTime();

Expand Down Expand Up @@ -880,13 +880,12 @@ public synchronized TimeoutStream endHandler(Handler<Void> endHandler) {

class NamedWorkerPool extends WorkerPool {

final ExecutorService workerExec;
private final ExecutorService workerExec;
final String name;
private int refCount = 1;

public NamedWorkerPool(String name, ExecutorService workerExec, ThreadPoolMetrics workerMetrics) {
super(internalOrderedFact.getExecutor(), new OrderedExecutorFactory(workerExec).getExecutor(), workerExec,
VertxImpl.this.internalBlockingPoolMetrics, workerMetrics);
super(workerExec, workerMetrics);
this.workerExec = workerExec;
this.name = name;
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/impl/WorkerContext.java
Expand Up @@ -31,7 +31,7 @@ public WorkerContext(VertxInternal vertx, WorkerPool workerPool, String deployme

@Override
public void executeAsync(Handler<Void> task) {
workerPool.workerExec.execute(wrapTask(null, task, true, workerPool.workerMetrics));
workerExec.execute(wrapTask(null, task, true, workerPool.workerMetrics));
}

@Override
Expand All @@ -53,7 +53,7 @@ protected void checkCorrectThread() {
// so we need to execute it on the worker thread
@Override
public void executeFromIO(ContextTask task) {
workerPool.workerExec.execute(wrapTask(task, null, true, workerPool.workerMetrics));
workerExec.execute(wrapTask(task, null, true, workerPool.workerMetrics));
}

}
56 changes: 5 additions & 51 deletions src/main/java/io/vertx/core/impl/WorkerPool.java
Expand Up @@ -16,68 +16,22 @@

package io.vertx.core.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.spi.metrics.ThreadPoolMetrics;

import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;

/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
class WorkerPool {

protected final ThreadPoolMetrics workerMetrics;
protected final ThreadPoolMetrics internalBlockingPoolMetrics;
protected final Executor orderedInternalPoolExec;
protected final Executor workerExec;
protected final OrderedExecutorFactory workerOrderedFact;
protected final Executor workerPool;
protected final ThreadPoolMetrics workerMetrics;

WorkerPool(Executor orderedInternalPoolExec, Executor workerExec, Executor workerPool,
ThreadPoolMetrics internalBlockingPoolMetrics, ThreadPoolMetrics workerMetrics) {
this.workerMetrics = workerMetrics;
this.internalBlockingPoolMetrics = internalBlockingPoolMetrics;
this.orderedInternalPoolExec = orderedInternalPoolExec;
this.workerExec = workerExec;
public WorkerPool(Executor workerPool, ThreadPoolMetrics workerMetrics) {
this.workerOrderedFact = new OrderedExecutorFactory(workerPool);
this.workerPool = workerPool;
}

<T> void executeBlocking(ContextImpl context, Action<T> action, Handler<Future<T>> blockingCodeHandler, boolean internal,
boolean ordered, Handler<AsyncResult<T>> resultHandler) {
ThreadPoolMetrics metrics = internal ? internalBlockingPoolMetrics : workerMetrics;
Object metric = metrics != null ? metrics.taskSubmitted() : null;
try {
Executor exec = internal ? orderedInternalPoolExec : (ordered ? workerExec : workerPool);
exec.execute(() -> {
if (metrics != null) {
metrics.taskExecuting(metric);
}
Future<T> res = Future.future();
try {
if (blockingCodeHandler != null) {
ContextImpl.setContext(context);
blockingCodeHandler.handle(res);
} else {
T result = action.perform();
res.complete(result);
}
} catch (Throwable e) {
res.fail(e);
}
if (metrics != null) {
metrics.taskCompleted(metric, res.succeeded());
}
if (resultHandler != null) {
context.runOnContext(v -> res.setHandler(resultHandler));
}
});
} catch (RejectedExecutionException ignore) {
// Pool is already shut down
if (metrics != null) {
metrics.taskRejected(metric);
}
}
this.workerMetrics = workerMetrics;
}
}

0 comments on commit f31dd91

Please sign in to comment.