Skip to content

Commit

Permalink
Make the Netty/Context integration work as expected
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Sep 25, 2015
1 parent 7ef9592 commit 9fb801d
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 46 deletions.
9 changes: 0 additions & 9 deletions src/main/java/io/vertx/core/Context.java
Expand Up @@ -217,13 +217,4 @@ static boolean isOnVertxThread() {
*/ */
int getInstanceCount(); int getInstanceCount();


/**
* Return the Netty EventLoop used by this Context. This can be used to integrate
* a Netty Server with a Vert.x runtime, specially the Context part.
*
* @return the EventLoop
*/
@GenIgnore
EventLoop nettyEventLoop();

} }
12 changes: 6 additions & 6 deletions src/main/java/io/vertx/core/impl/ContextImpl.java
Expand Up @@ -37,7 +37,7 @@
/** /**
* @author <a href="http://tfox.org">Tim Fox</a> * @author <a href="http://tfox.org">Tim Fox</a>
*/ */
public abstract class ContextImpl implements Context { public abstract class ContextImpl implements ContextInternal {


private static final Logger log = LoggerFactory.getLogger(ContextImpl.class); private static final Logger log = LoggerFactory.getLogger(ContextImpl.class);


Expand Down Expand Up @@ -305,7 +305,11 @@ private <T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCo


protected Runnable wrapTask(ContextTask cTask, Handler<Void> hTask, boolean checkThread) { protected Runnable wrapTask(ContextTask cTask, Handler<Void> hTask, boolean checkThread) {
return () -> { return () -> {
VertxThread current = getCurrentThread(); Thread th = Thread.currentThread();
if (!(th instanceof VertxThread)) {
throw new IllegalStateException("Uh oh! Event loop context executing with wrong thread! Expected " + contextThread + " got " + th);
}
VertxThread current = (VertxThread) th;
if (THREAD_CHECKS && checkThread) { if (THREAD_CHECKS && checkThread) {
if (contextThread == null) { if (contextThread == null) {
contextThread = current; contextThread = current;
Expand Down Expand Up @@ -335,10 +339,6 @@ protected Runnable wrapTask(ContextTask cTask, Handler<Void> hTask, boolean chec
}; };
} }


private VertxThread getCurrentThread() {
return (VertxThread) Thread.currentThread();
}

private void setTCCL() { private void setTCCL() {
Thread.currentThread().setContextClassLoader(tccl); Thread.currentThread().setContextClassLoader(tccl);
} }
Expand Down
49 changes: 49 additions & 0 deletions src/main/java/io/vertx/core/impl/ContextInternal.java
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2011-2013 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/

package io.vertx.core.impl;

import io.netty.channel.EventLoop;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

/**
* This interface provides an api for vert.x core internal use only
* It is not part of the public API and should not be used by
* developers creating vert.x applications
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public interface ContextInternal extends Context {

/**
* Return the Netty EventLoop used by this Context. This can be used to integrate
* a Netty Server with a Vert.x runtime, specially the Context part.
*
* @return the EventLoop
*/
EventLoop nettyEventLoop();

/**
* Execute the context task and switch on this context if necessary, this also associates the
* current thread with the current context so {@link Vertx#currentContext()} returns this context.<p/>
*
* The caller thread should be the the event loop thread of this context.
*
* @param task the task to execute
*/
void executeFromIO(ContextTask task);
}
60 changes: 60 additions & 0 deletions src/test/java/io/vertx/test/core/ContextTest.java
Expand Up @@ -16,10 +16,19 @@


package io.vertx.test.core; package io.vertx.test.core;


import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context; import io.vertx.core.Context;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Vertx; import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextImpl;
import io.vertx.core.impl.ContextInternal;
import org.junit.Test; import org.junit.Test;


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/** /**
* @author <a href="http://tfox.org">Tim Fox</a> * @author <a href="http://tfox.org">Tim Fox</a>
*/ */
Expand Down Expand Up @@ -118,4 +127,55 @@ public void testExecuteUnorderedBlocking() throws Exception {
}); });
await(); await();
} }

@Test
public void testEventLoopExecuteFromIo() throws Exception {
ContextInternal eventLoopContext = (ContextInternal) vertx.getOrCreateContext();

// Check from other thread
try {
eventLoopContext.executeFromIO(this::fail);
fail();
} catch (IllegalStateException expected) {
}

// Check from event loop thread
eventLoopContext.nettyEventLoop().execute(() -> {
// Should not be set yet
assertNull(Vertx.currentContext());
Thread vertxThread = Thread.currentThread();
AtomicBoolean nested = new AtomicBoolean(true);
eventLoopContext.executeFromIO(() -> {
assertTrue(nested.get());
assertSame(eventLoopContext, Vertx.currentContext());
assertSame(vertxThread, Thread.currentThread());
});
nested.set(false);
testComplete();
});
await();
}

@Test
public void testWorkerExecuteFromIo() throws Exception {
AtomicReference<ContextInternal> workerContext = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() throws Exception {
workerContext.set((ContextInternal) context);
latch.countDown();
}
}, new DeploymentOptions().setWorker(true));
awaitLatch(latch);
workerContext.get().nettyEventLoop().execute(() -> {
assertNull(Vertx.currentContext());
workerContext.get().executeFromIO(() -> {
assertSame(workerContext.get(), Vertx.currentContext());
assertTrue(Context.isOnWorkerThread());
testComplete();
});
});
await();
}
} }
78 changes: 47 additions & 31 deletions src/test/java/io/vertx/test/core/EventLoopGroupTest.java
Expand Up @@ -30,6 +30,7 @@
import io.vertx.core.Context; import io.vertx.core.Context;
import io.vertx.core.Vertx; import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.NetClientOptions; import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket; import io.vertx.core.net.NetSocket;
import org.junit.Test; import org.junit.Test;
Expand All @@ -53,7 +54,7 @@ public void testGetEventLoopGroup() {


@Test @Test
public void testNettyServerUsesContextEventLoop() throws Exception { public void testNettyServerUsesContextEventLoop() throws Exception {
Context context = vertx.getOrCreateContext(); ContextInternal context = (ContextInternal) vertx.getOrCreateContext();
AtomicReference<Thread> contextThread = new AtomicReference<>(); AtomicReference<Thread> contextThread = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
context.runOnContext(v -> { context.runOnContext(v -> {
Expand All @@ -69,36 +70,51 @@ public void testNettyServerUsesContextEventLoop() throws Exception {
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
assertSame(contextThread.get(), Thread.currentThread()); assertSame(contextThread.get(), Thread.currentThread());
assertSame(context, Vertx.currentContext()); context.executeFromIO(() -> {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { assertSame(contextThread.get(), Thread.currentThread());
@Override assertSame(context, Vertx.currentContext());
public void channelActive(ChannelHandlerContext ctx) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
assertSame(contextThread.get(), Thread.currentThread()); @Override
assertSame(context, Vertx.currentContext()); public void channelActive(ChannelHandlerContext ctx) throws Exception {
} assertSame(contextThread.get(), Thread.currentThread());
@Override context.executeFromIO(() -> {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { assertSame(contextThread.get(), Thread.currentThread());
ByteBuf buf = (ByteBuf) msg; assertSame(context, Vertx.currentContext());
assertEquals("hello", buf.toString(StandardCharsets.UTF_8)); });
assertSame(contextThread.get(), Thread.currentThread()); }
assertSame(context, Vertx.currentContext()); @Override
} public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
@Override ByteBuf buf = (ByteBuf) msg;
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { assertEquals("hello", buf.toString(StandardCharsets.UTF_8));
assertSame(contextThread.get(), Thread.currentThread()); assertSame(contextThread.get(), Thread.currentThread());
assertSame(context, Vertx.currentContext()); context.executeFromIO(() -> {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); assertSame(contextThread.get(), Thread.currentThread());
} assertSame(context, Vertx.currentContext());
@Override });
public void channelInactive(ChannelHandlerContext ctx) throws Exception { }
assertSame(contextThread.get(), Thread.currentThread()); @Override
assertSame(context, Vertx.currentContext()); public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
testComplete(); assertSame(contextThread.get(), Thread.currentThread());
} context.executeFromIO(() -> {
@Override assertSame(contextThread.get(), Thread.currentThread());
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { assertSame(context, Vertx.currentContext());
fail(cause.getMessage()); ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
} });
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
assertSame(contextThread.get(), Thread.currentThread());
context.executeFromIO(() -> {
assertSame(contextThread.get(), Thread.currentThread());
assertSame(context, Vertx.currentContext());
testComplete();
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
fail(cause.getMessage());
}
});
}); });
} }
}); });
Expand Down

0 comments on commit 9fb801d

Please sign in to comment.