Skip to content

Commit

Permalink
Provide support for NetClient as well
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Jul 5, 2017
1 parent fe0d24d commit a252e3f
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 7 deletions.
5 changes: 4 additions & 1 deletion src/main/java/io/vertx/core/impl/NetSocketInternal.java
Expand Up @@ -15,8 +15,9 @@
*/
package io.vertx.core.impl;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.net.NetSocket;

Expand All @@ -29,6 +30,8 @@ public interface NetSocketInternal extends NetSocket {

NetSocketInternal writeMessage(Object message);

NetSocketInternal writeMessage(Object message, Handler<AsyncResult<Void>> handler);

NetSocketInternal messageHandler(Handler<Object> handler);

}
3 changes: 1 addition & 2 deletions src/main/java/io/vertx/core/net/impl/NetClientBase.java
Expand Up @@ -178,12 +178,10 @@ protected void doConnect(int port, String host, String serverName, Handler<Async
}

Handler<Channel> channelInitializer = ch -> {
ChannelPipeline pipeline = ch.pipeline();
if (sslHelper.isSSL()) {
SslHandler sslHandler = new SslHandler(sslHelper.createEngine(vertx, host, port, serverName));
ch.pipeline().addLast("ssl", sslHandler);
}
initChannel(pipeline);
};

Handler<AsyncResult<Channel>> channelHandler = res -> {
Expand Down Expand Up @@ -229,6 +227,7 @@ private void connected(ContextImpl context, Channel ch, Handler<AsyncResult<C>>

// Need to set context before constructor is called as writehandler registration needs this
ContextImpl.setContext(context);
initChannel(ch.pipeline());

VertxNetHandler<C> handler = new VertxNetHandler<C>(ch, ctx -> createConnection(vertx, ctx, host, port, context, sslHelper, metrics)) {
@Override
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/io/vertx/core/net/impl/NetClientImpl.java
Expand Up @@ -98,8 +98,7 @@ protected NetSocketImpl createConnection(VertxInternal vertx, ChannelHandlerCont

@Override
protected void handleMsgReceived(NetSocketImpl conn, Object msg) {
ByteBuf buf = (ByteBuf) msg;
conn.handleDataReceived(Buffer.buffer(buf));
conn.handleMessageReceived(msg);
}

@Override
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/vertx/core/net/impl/NetServerBase.java
Expand Up @@ -323,7 +323,9 @@ private void connected(Channel ch) {
}
// Need to set context before constructor is called as writehandler registration needs this
ContextImpl.setContext(handler.context);

NetServerBase.this.initChannel(ch.pipeline());

VertxNetHandler<C> nh = new VertxNetHandler<C>(ch, ctx -> createConnection(vertx, ctx, handler.context, sslHelper, metrics)) {
@Override
protected void handleMessage(C connection, ContextImpl context, ChannelHandlerContext chctx, Object msg) throws Exception {
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/io/vertx/core/net/impl/NetSocketImpl.java
Expand Up @@ -23,6 +23,7 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPromise;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.CharsetUtil;
import io.vertx.core.AsyncResult;
Expand Down Expand Up @@ -113,6 +114,23 @@ public NetSocketInternal writeMessage(Object message) {
return this;
}

@Override
public NetSocketInternal writeMessage(Object message, Handler<AsyncResult<Void>> handler) {
ChannelPromise promise = chctx.newPromise();
super.writeToChannel(message, promise);
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
handler.handle(Future.succeededFuture());
} else {
handler.handle(Future.failedFuture(future.cause()));
}
}
});
return this;
}

@Override
public NetSocket write(Buffer data) {
ByteBuf buf = data.getByteBuf();
Expand Down
55 changes: 53 additions & 2 deletions src/test/java/io/vertx/test/core/NetTest.java
Expand Up @@ -18,12 +18,18 @@

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpVersion;
Expand All @@ -43,6 +49,8 @@
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.http.ClientAuth;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.ContextImpl;
import io.vertx.core.impl.EventLoopContext;
Expand Down Expand Up @@ -3049,12 +3057,12 @@ public void start(Future<Void> startFuture) throws Exception {
}

@Test
public void testNetInternal() throws Exception {
public void testNetServerInternal() throws Exception {
server.connectHandler(so -> {
NetSocketInternal internal = (NetSocketInternal) so;
ChannelHandlerContext chctx = internal.channelHandlerContext();
ChannelPipeline pipeline = chctx.pipeline();
pipeline.addFirst(new HttpServerCodec());
pipeline.addBefore("handler", "http", new HttpServerCodec());
internal.messageHandler(obj -> {
if (obj instanceof LastHttpContent) {
DefaultFullHttpResponse response = new DefaultFullHttpResponse(
Expand All @@ -3079,6 +3087,49 @@ public void testNetInternal() throws Exception {
await();
}

@Test
public void testNetClientInternal() throws Exception {
waitFor(2);
HttpServer server = vertx.createHttpServer(new HttpServerOptions().setHost("localhost").setPort(1234));
server.requestHandler(req -> {
req.response().end("Hello World"); });
CountDownLatch latch = new CountDownLatch(1);
server.listen(onSuccess(v -> {
latch.countDown();
}));
awaitLatch(latch);
client.close();
client = vertx.createNetClient(new NetClientOptions().setTcpNoDelay(true));
client.connect(1234, "localhost", onSuccess(so -> {
NetSocketInternal soInt = (NetSocketInternal) so;
ChannelHandlerContext chctx = soInt.channelHandlerContext();
ChannelPipeline pipeline = chctx.pipeline();
pipeline.addBefore("handler", "http", new HttpClientCodec());
AtomicInteger status = new AtomicInteger();
soInt.messageHandler(obj -> {
switch (status.getAndIncrement()) {
case 0:
assertTrue(obj instanceof HttpResponse);
HttpResponse resp = (HttpResponse) obj;
assertEquals(200, resp.status().code());
break;
case 1:
assertTrue(obj instanceof LastHttpContent);
assertEquals("Hello World", ((LastHttpContent)obj).content().toString(StandardCharsets.UTF_8));
complete();
break;
default:
fail();
}
});
soInt.handler(buff -> fail());
soInt.writeMessage(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/somepath"), onSuccess(v -> {
complete();
}));
}));
await();
}

protected void startServer() throws Exception {
startServer(vertx.getOrCreateContext());
}
Expand Down

0 comments on commit a252e3f

Please sign in to comment.