Skip to content

Commit

Permalink
Cannot process buffer in worker NetClient on channel inactive - fixes #…
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 27, 2017
1 parent 4855ffb commit 81d561f
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 10 deletions.
Expand Up @@ -618,7 +618,7 @@ public void channelRead(ChannelHandlerContext chctx, Object msg) throws Exceptio
}

@Override
protected void handleMsgReceived(Object msg) {
protected void handleMsgReceived(NetSocketImpl conn, Object msg) {
ByteBuf buf = (ByteBuf) msg;
conn.handleDataReceived(Buffer.buffer(buf));
}
Expand Down
Expand Up @@ -255,7 +255,7 @@ public void channelRead(ChannelHandlerContext chctx, Object msg) throws Exceptio
}

@Override
protected void handleMsgReceived(Object msg) {
protected void handleMsgReceived(NetSocketImpl conn, Object msg) {
ByteBuf buf = (ByteBuf) msg;
conn.handleDataReceived(Buffer.buffer(buf));
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/net/impl/NetClientImpl.java
Expand Up @@ -192,7 +192,7 @@ private void connect(int port, String host, Handler<AsyncResult<NetSocket>> conn
}
pipeline.addLast("handler", new VertxNetHandler<NetSocketImpl>(ch, socketMap) {
@Override
protected void handleMsgReceived(Object msg) {
protected void handleMsgReceived(NetSocketImpl conn, Object msg) {
ByteBuf buf = (ByteBuf) msg;
conn.handleDataReceived(Buffer.buffer(buf));
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/net/impl/NetServerBase.java
Expand Up @@ -304,7 +304,7 @@ public ServerHandler(Channel ch) {
}

@Override
protected void handleMsgReceived(Object msg) {
protected void handleMsgReceived(C conn, Object msg) {
NetServerBase.this.handleMsgReceived(conn, msg);
}

Expand Down
11 changes: 5 additions & 6 deletions src/main/java/io/vertx/core/net/impl/VertxNetHandler.java
Expand Up @@ -31,7 +31,7 @@ public abstract class VertxNetHandler<C extends ConnectionBase> extends VertxHan

private final Channel ch;
private final Map<Channel, C> connectionMap;
protected C conn;
C conn;

public VertxNetHandler(Channel ch, Map<Channel, C> connectionMap) {
this.ch = ch;
Expand All @@ -57,17 +57,16 @@ protected C removeConnection() {
return conn;
}


@Override
protected void channelRead(C sock, ContextImpl context, ChannelHandlerContext chctx, Object msg) throws Exception {
if (sock != null) {
context.executeFromIO(() -> handleMsgReceived(msg));
protected void channelRead(C conn, ContextImpl context, ChannelHandlerContext chctx, Object msg) throws Exception {
if (conn != null) {
context.executeFromIO(() -> handleMsgReceived(conn, msg));
} else {
// just discard
}
}

protected abstract void handleMsgReceived(Object msg);
protected abstract void handleMsgReceived(C conn, Object msg);

@Override
protected Object safeObject(Object msg, ByteBufAllocator allocator) throws Exception {
Expand Down
81 changes: 81 additions & 0 deletions src/test/java/io/vertx/test/core/NetTest.java
Expand Up @@ -21,6 +21,7 @@
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
Expand Down Expand Up @@ -2730,4 +2731,84 @@ public void testSelfSignedCertificate() throws Exception {

awaitLatch(latch);
}

@Test
public void testWorkerClient() throws Exception {
String expected = TestUtils.randomAlphaString(2000);
server.connectHandler(so -> {
so.write(expected).close();
});
startServer();
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() throws Exception {
NetClient client = vertx.createNetClient();
client.connect(1234, "localhost", onSuccess(so ->{
Buffer received = Buffer.buffer();
so.handler(received::appendBuffer);
so.closeHandler(v -> {
assertEquals(expected, received.toString());
testComplete();
});
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}));

}
}, new DeploymentOptions().setWorker(true));
await();
}

@Test
public void testWorkerServer() throws Exception {
String expected = TestUtils.randomAlphaString(2000);
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start(Future<Void> startFuture) throws Exception {
NetServer server = vertx.createNetServer();
server.connectHandler(so -> {
Buffer received = Buffer.buffer();
so.handler(received::appendBuffer);
so.closeHandler(v -> {
assertEquals(expected, received.toString());
testComplete();
});
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
server.listen(1234, ar -> startFuture.handle(ar.mapEmpty()));
}
}, new DeploymentOptions().setWorker(true), onSuccess(v -> {
client.connect(1234, "localhost", onSuccess(so -> {
so.write(expected).close();
}));
}));
await();
}

protected void startServer() throws Exception {
startServer(vertx.getOrCreateContext());
}

protected void startServer(NetServer server) throws Exception {
startServer(vertx.getOrCreateContext(), server);
}

protected void startServer(Context context) throws Exception {
startServer(context, server);
}

protected void startServer(Context context, NetServer server) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
context.runOnContext(v -> {
server.listen(onSuccess(s -> latch.countDown()));
});
awaitLatch(latch);
}
}

0 comments on commit 81d561f

Please sign in to comment.