Skip to content

Commit

Permalink
Provide a Context#executeFromIO(T,Handler<T>) that allows to pass an …
Browse files Browse the repository at this point in the history
…argument to the handler to avoid unnecessary capturing lambda creation
  • Loading branch information
vietj committed Apr 5, 2018
1 parent b8a5ca7 commit 61fd10a
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 55 deletions.
90 changes: 49 additions & 41 deletions src/main/java/io/vertx/core/impl/ContextImpl.java
Expand Up @@ -176,11 +176,16 @@ static boolean isOnVertxThread(boolean worker) {
// but check this anyway, then execute directly
@Override
public void executeFromIO(Handler<Void> task) {
executeFromIO(null, task);
}

@Override
public <T> void executeFromIO(T value, Handler<T> task) {
if (THREAD_CHECKS) {
checkCorrectThread();
}
// No metrics on this, as we are on the event loop.
wrapTask(task, true, null).run();
executeTask(value, task, true);
}

protected abstract void checkCorrectThread();
Expand Down Expand Up @@ -294,55 +299,58 @@ public synchronized ConcurrentMap<Object, Object> contextData() {
return contextData;
}

protected Runnable wrapTask(Handler<Void> hTask, boolean checkThread, PoolMetrics metrics) {
protected <T> Runnable wrapTask(T arg, Handler<T> hTask, boolean checkThread, PoolMetrics metrics) {
Object metric = metrics != null ? metrics.submitted() : null;
return () -> {
Thread th = Thread.currentThread();
if (!(th instanceof VertxThread)) {
throw new IllegalStateException("Uh oh! Event loop context executing with wrong thread! Expected " + contextThread + " got " + th);
}
VertxThread current = (VertxThread) th;
if (THREAD_CHECKS && checkThread) {
if (contextThread == null) {
contextThread = current;
} else if (contextThread != current && !contextThread.isWorker()) {
throw new IllegalStateException("Uh oh! Event loop context executing with wrong thread! Expected " + contextThread + " got " + current);
}
}
if (metrics != null) {
metrics.begin(metric);
}
if (!DISABLE_TIMINGS) {
current.executeStart();
}
try {
setContext(current, ContextImpl.this);
hTask.handle(null);
if (metrics != null) {
metrics.end(metric, true);
}
} catch (Throwable t) {
log.error("Unhandled exception", t);
Handler<Throwable> handler = this.exceptionHandler;
if (handler == null) {
handler = owner.exceptionHandler();
}
if (handler != null) {
handler.handle(t);
}
if (metrics != null) {
metrics.end(metric, false);
}
} finally {
// We don't unset the context after execution - this is done later when the context is closed via
// VertxThreadFactory
if (!DISABLE_TIMINGS) {
current.executeEnd();
}
boolean succeeded = executeTask(arg, hTask, checkThread);
if (metrics != null) {
metrics.end(metric, succeeded);
}
};
}

protected <T> boolean executeTask(T arg, Handler<T> hTask, boolean checkThread) {
Thread th = Thread.currentThread();
if (!(th instanceof VertxThread)) {
throw new IllegalStateException("Uh oh! Event loop context executing with wrong thread! Expected " + contextThread + " got " + th);
}
VertxThread current = (VertxThread) th;
if (THREAD_CHECKS && checkThread) {
if (contextThread == null) {
contextThread = current;
} else if (contextThread != current && !contextThread.isWorker()) {
throw new IllegalStateException("Uh oh! Event loop context executing with wrong thread! Expected " + contextThread + " got " + current);
}
}
if (!DISABLE_TIMINGS) {
current.executeStart();
}
try {
setContext(current, ContextImpl.this);
hTask.handle(arg);
return true;
} catch (Throwable t) {
log.error("Unhandled exception", t);
Handler<Throwable> handler = this.exceptionHandler;
if (handler == null) {
handler = owner.exceptionHandler();
}
if (handler != null) {
handler.handle(t);
}
return false;
} finally {
// We don't unset the context after execution - this is done later when the context is closed via
// VertxThreadFactory
if (!DISABLE_TIMINGS) {
current.executeEnd();
}
}
}

private void setTCCL() {
Thread.currentThread().setContextClassLoader(tccl);
}
Expand Down
12 changes: 9 additions & 3 deletions src/main/java/io/vertx/core/impl/ContextInternal.java
Expand Up @@ -69,17 +69,23 @@ static boolean isOnVertxThread() {
@Override
VertxInternal owner();

/**
* Like {@link #executeFromIO(Object, Handler)} but with no argument.
*/
void executeFromIO(Handler<Void> task);

/**
* Execute the context task and switch on this context if necessary, this also associates the
* current thread with the current context so {@link Vertx#currentContext()} returns this context.<p/>
*
* The caller thread should be the the event loop thread of this context.<p/>
*
* Any exception thrown from the {@literal stack} will be reported on this context.
* Any exception thrown from the {@literal task} will be reported on this context.
*
* @param task the task to execute
* @param value the argument for the {@code task}
* @param task the task to execute with the {@code value} argument
*/
void executeFromIO(Handler<Void> task);
<T> void executeFromIO(T value, Handler<T> task);

/**
* @return the {@link ConcurrentMap} used to store context data
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/impl/EventLoopContext.java
Expand Up @@ -36,7 +36,7 @@ public EventLoopContext(VertxInternal vertx, EventLoop eventLoop, WorkerPool int

public void executeAsync(Handler<Void> task) {
// No metrics, we are on the event loop.
nettyEventLoop().execute(wrapTask(task, true, null));
nettyEventLoop().execute(() -> executeTask(null, task, true));
}

@Override
Expand Down
Expand Up @@ -26,7 +26,7 @@ class MultiThreadedWorkerContext extends WorkerContext {

@Override
public void executeAsync(Handler<Void> task) {
workerPool.executor().execute(wrapTask(task, false, workerPool.metrics()));
workerPool.executor().execute(wrapTask(null, task, false, workerPool.metrics()));
}

@Override
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/io/vertx/core/impl/WorkerContext.java
Expand Up @@ -26,7 +26,7 @@ class WorkerContext extends ContextImpl {

@Override
public void executeAsync(Handler<Void> task) {
orderedTasks.execute(wrapTask(task, true, workerPool.metrics()), workerPool.executor());
orderedTasks.execute(wrapTask(null, task, true, workerPool.metrics()), workerPool.executor());
}

@Override
Expand All @@ -48,7 +48,11 @@ protected void checkCorrectThread() {
// so we need to execute it on the worker thread
@Override
public void executeFromIO(Handler<Void> task) {
orderedTasks.execute(wrapTask(task, true, workerPool.metrics()), workerPool.executor());
executeFromIO(null, task);
}

@Override
public <T> void executeFromIO(T value, Handler<T> task) {
orderedTasks.execute(wrapTask(value, task, true, workerPool.metrics()), workerPool.executor());
}
}
13 changes: 7 additions & 6 deletions src/main/java/io/vertx/core/net/impl/VertxHandler.java
Expand Up @@ -32,6 +32,7 @@ public abstract class VertxHandler<C extends ConnectionBase> extends ChannelDu
private Handler<Void> endReadAndFlush;
private Handler<C> addHandler;
private Handler<C> removeHandler;
private Handler<Object> messageHandler;

/**
* Set the connection, this is usually called by subclasses when the channel is added to the pipeline.
Expand All @@ -44,6 +45,10 @@ protected void setConnection(C connection) {
if (addHandler != null) {
addHandler.handle(connection);
}
messageHandler = m -> {
conn.startRead();
handleMessage(conn, m);
};
}

/**
Expand Down Expand Up @@ -138,12 +143,8 @@ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
@Override
public void channelRead(ChannelHandlerContext chctx, Object msg) throws Exception {
Object message = decode(msg, chctx.alloc());
ContextInternal context;
context = conn.getContext();
context.executeFromIO(v -> {
conn.startRead();
handleMessage(conn, message);
});
ContextInternal ctx = conn.getContext();
ctx.executeFromIO(message, messageHandler);
}

@Override
Expand Down
Expand Up @@ -31,7 +31,7 @@ public BenchmarkContext(VertxInternal vertx, WorkerPool internalBlockingPool, Wo

@Override
protected void executeAsync(Handler<Void> task) {
wrapTask(task, true, null).run();
executeTask(null, task, true);
}

@Override
Expand Down

0 comments on commit 61fd10a

Please sign in to comment.