Skip to content

Commit

Permalink
Expose io.vertx.core.Context Netty EventLoop so such context can be i…
Browse files Browse the repository at this point in the history
…nvoked by an existing Netty application and the Context will be set correctly by Vert.x
  • Loading branch information
vietj committed Sep 23, 2015
1 parent d19a953 commit 8f87eb3
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 9 deletions.
11 changes: 11 additions & 0 deletions src/main/java/io/vertx/core/Context.java
Expand Up @@ -16,6 +16,8 @@


package io.vertx.core; package io.vertx.core;


import io.netty.channel.EventLoop;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.VertxGen; import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.impl.ContextImpl; import io.vertx.core.impl.ContextImpl;
import io.vertx.core.json.JsonObject; import io.vertx.core.json.JsonObject;
Expand Down Expand Up @@ -215,4 +217,13 @@ static boolean isOnVertxThread() {
*/ */
int getInstanceCount(); int getInstanceCount();


/**
* Return the Netty EventLoop used by this Context. This can be used to integrate
* a Netty Server with a Vert.x runtime, specially the Context part.
*
* @return the EventLoop
*/
@GenIgnore
EventLoop nettyEventLoop();

} }
Expand Up @@ -61,7 +61,7 @@ public DatagramSocketImpl(VertxInternal vertx, DatagramSocketOptions options) {
throw new IllegalStateException("Cannot use DatagramSocket in a multi-threaded worker verticle"); throw new IllegalStateException("Cannot use DatagramSocket in a multi-threaded worker verticle");
} }
channel().config().setOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true); channel().config().setOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
context.eventLoop().register(channel); context.nettyEventLoop().register(channel);
channel.pipeline().addLast("handler", new DatagramServerHandler(this)); channel.pipeline().addLast("handler", new DatagramServerHandler(this));
channel().config().setMaxMessagesPerRead(1); channel().config().setMaxMessagesPerRead(1);
} }
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java
Expand Up @@ -80,7 +80,7 @@ public DnsClientImpl(VertxInternal vertx, int port, String host) {


actualCtx = vertx.getOrCreateContext(); actualCtx = vertx.getOrCreateContext();
bootstrap = new Bootstrap(); bootstrap = new Bootstrap();
bootstrap.group(actualCtx.eventLoop()); bootstrap.group(actualCtx.nettyEventLoop());
bootstrap.channel(NioDatagramChannel.class); bootstrap.channel(NioDatagramChannel.class);
bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
bootstrap.handler(new ChannelInitializer<DatagramChannel>() { bootstrap.handler(new ChannelInitializer<DatagramChannel>() {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Expand Up @@ -639,7 +639,7 @@ private void applyConnectionOptions(Bootstrap bootstrap) {
private void internalConnect(ContextImpl context, int port, String host, Handler<ClientConnection> connectHandler, private void internalConnect(ContextImpl context, int port, String host, Handler<ClientConnection> connectHandler,
Handler<Throwable> connectErrorHandler, ConnectionLifeCycleListener listener) { Handler<Throwable> connectErrorHandler, ConnectionLifeCycleListener listener) {
Bootstrap bootstrap = new Bootstrap(); Bootstrap bootstrap = new Bootstrap();
bootstrap.group(context.eventLoop()); bootstrap.group(context.nettyEventLoop());
bootstrap.channelFactory(new VertxNioSocketChannelFactory()); bootstrap.channelFactory(new VertxNioSocketChannelFactory());
sslHelper.validate(vertx); sslHelper.validate(vertx);
bootstrap.handler(new ChannelInitializer<Channel>() { bootstrap.handler(new ChannelInitializer<Channel>() {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/impl/ContextImpl.java
Expand Up @@ -247,7 +247,7 @@ public List<String> processArgs() {
return processArgument != null ? processArgument : Starter.PROCESS_ARGS; return processArgument != null ? processArgument : Starter.PROCESS_ARGS;
} }


public EventLoop eventLoop() { public EventLoop nettyEventLoop() {
return eventLoop; return eventLoop;
} }


Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/impl/EventLoopContext.java
Expand Up @@ -36,7 +36,7 @@ public EventLoopContext(VertxInternal vertx, Executor internalBlockingExec, Exec
} }


public void executeAsync(Handler<Void> task) { public void executeAsync(Handler<Void> task) {
eventLoop().execute(wrapTask(null, task, true)); nettyEventLoop().execute(wrapTask(null, task, true));
} }


@Override @Override
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/impl/VertxImpl.java
Expand Up @@ -708,7 +708,7 @@ boolean cancel() {
this.timerID = timerID; this.timerID = timerID;
this.handler = runnable; this.handler = runnable;
this.periodic = periodic; this.periodic = periodic;
EventLoop el = context.eventLoop(); EventLoop el = context.nettyEventLoop();
Runnable toRun = () -> context.runOnContext(this); Runnable toRun = () -> context.runOnContext(this);
if (periodic) { if (periodic) {
future = el.scheduleAtFixedRate(toRun, delay, delay, TimeUnit.MILLISECONDS); future = el.scheduleAtFixedRate(toRun, delay, delay, TimeUnit.MILLISECONDS);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/net/impl/HandlerManager.java
Expand Up @@ -56,7 +56,7 @@ public HandlerHolder<T> chooseHandler(EventLoop worker) {
} }


public synchronized void addHandler(Handler<T> handler, ContextImpl context) { public synchronized void addHandler(Handler<T> handler, ContextImpl context) {
EventLoop worker = context.eventLoop(); EventLoop worker = context.nettyEventLoop();
availableWorkers.addWorker(worker); availableWorkers.addWorker(worker);
Handlers<T> handlers = new Handlers<>(); Handlers<T> handlers = new Handlers<>();
Handlers<T> prev = handlerMap.putIfAbsent(worker, handlers); Handlers<T> prev = handlerMap.putIfAbsent(worker, handlers);
Expand All @@ -68,7 +68,7 @@ public synchronized void addHandler(Handler<T> handler, ContextImpl context) {
} }


public synchronized void removeHandler(Handler<T> handler, ContextImpl context) { public synchronized void removeHandler(Handler<T> handler, ContextImpl context) {
EventLoop worker = context.eventLoop(); EventLoop worker = context.nettyEventLoop();
Handlers<T> handlers = handlerMap.get(worker); Handlers<T> handlers = handlerMap.get(worker);
if (!handlers.removeHandler(new HandlerHolder<>(context, handler))) { if (!handlers.removeHandler(new HandlerHolder<>(context, handler))) {
throw new IllegalStateException("Can't find handler"); throw new IllegalStateException("Can't find handler");
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/net/impl/NetClientImpl.java
Expand Up @@ -153,7 +153,7 @@ private void connect(int port, String host, Handler<AsyncResult<NetSocket>> conn
ContextImpl context = vertx.getOrCreateContext(); ContextImpl context = vertx.getOrCreateContext();
sslHelper.validate(vertx); sslHelper.validate(vertx);
Bootstrap bootstrap = new Bootstrap(); Bootstrap bootstrap = new Bootstrap();
bootstrap.group(context.eventLoop()); bootstrap.group(context.nettyEventLoop());
bootstrap.channel(NioSocketChannel.class); bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<Channel>() { bootstrap.handler(new ChannelInitializer<Channel>() {
@Override @Override
Expand Down
79 changes: 79 additions & 0 deletions src/test/java/io/vertx/test/core/EventLoopGroupTest.java
Expand Up @@ -16,9 +16,28 @@


package io.vertx.test.core; package io.vertx.test.core;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import org.junit.Test; import org.junit.Test;


import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

/** /**
* @author <a href="http://tfox.org">Tim Fox</a> * @author <a href="http://tfox.org">Tim Fox</a>
*/ */
Expand All @@ -31,4 +50,64 @@ public void testGetEventLoopGroup() {
assertNotNull(elp); assertNotNull(elp);


} }

@Test
public void testNettyServerUsesContextEventLoop() throws Exception {
Context context = vertx.getOrCreateContext();
AtomicReference<Thread> contextThread = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
context.runOnContext(v -> {
contextThread.set(Thread.currentThread());
latch.countDown();
});
awaitLatch(latch);
ServerBootstrap bs = new ServerBootstrap();
bs.group(context.nettyEventLoop());
bs.channel(NioServerSocketChannel.class);
bs.option(ChannelOption.SO_BACKLOG, 100);
bs.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
assertSame(contextThread.get(), Thread.currentThread());
assertSame(context, Vertx.currentContext());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
assertSame(contextThread.get(), Thread.currentThread());
assertSame(context, Vertx.currentContext());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
assertEquals("hello", buf.toString(StandardCharsets.UTF_8));
assertSame(contextThread.get(), Thread.currentThread());
assertSame(context, Vertx.currentContext());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
assertSame(contextThread.get(), Thread.currentThread());
assertSame(context, Vertx.currentContext());
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
assertSame(contextThread.get(), Thread.currentThread());
assertSame(context, Vertx.currentContext());
testComplete();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
fail(cause.getMessage());
}
});
}
});
bs.bind("localhost", 1234).sync();
vertx.createNetClient(new NetClientOptions()).connect(1234, "localhost", ar -> {
assertTrue(ar.succeeded());
NetSocket so = ar.result();
so.write(Buffer.buffer("hello"));
});
await();
}
} }

0 comments on commit 8f87eb3

Please sign in to comment.